]> granicus.if.org Git - postgresql/blob - src/backend/utils/sort/logtape.c
Remove QNX port.
[postgresql] / src / backend / utils / sort / logtape.c
1 /*-------------------------------------------------------------------------
2  *
3  * logtape.c
4  *        Management of "logical tapes" within temporary files.
5  *
6  * This module exists to support sorting via multiple merge passes (see
7  * tuplesort.c).  Merging is an ideal algorithm for tape devices, but if
8  * we implement it on disk by creating a separate file for each "tape",
9  * there is an annoying problem: the peak space usage is at least twice
10  * the volume of actual data to be sorted.      (This must be so because each
11  * datum will appear in both the input and output tapes of the final
12  * merge pass.  For seven-tape polyphase merge, which is otherwise a
13  * pretty good algorithm, peak usage is more like 4x actual data volume.)
14  *
15  * We can work around this problem by recognizing that any one tape
16  * dataset (with the possible exception of the final output) is written
17  * and read exactly once in a perfectly sequential manner.      Therefore,
18  * a datum once read will not be required again, and we can recycle its
19  * space for use by the new tape dataset(s) being generated.  In this way,
20  * the total space usage is essentially just the actual data volume, plus
21  * insignificant bookkeeping and start/stop overhead.
22  *
23  * Few OSes allow arbitrary parts of a file to be released back to the OS,
24  * so we have to implement this space-recycling ourselves within a single
25  * logical file.  logtape.c exists to perform this bookkeeping and provide
26  * the illusion of N independent tape devices to tuplesort.c.  Note that
27  * logtape.c itself depends on buffile.c to provide a "logical file" of
28  * larger size than the underlying OS may support.
29  *
30  * For simplicity, we allocate and release space in the underlying file
31  * in BLCKSZ-size blocks.  Space allocation boils down to keeping track
32  * of which blocks in the underlying file belong to which logical tape,
33  * plus any blocks that are free (recycled and not yet reused).  Normally
34  * there are not very many free blocks, so we just keep those in a list.
35  * The blocks in each logical tape are remembered using a method borrowed
36  * from the Unix HFS filesystem: we store data block numbers in an
37  * "indirect block".  If an indirect block fills up, we write it out to
38  * the underlying file and remember its location in a second-level indirect
39  * block.  In the same way second-level blocks are remembered in third-
40  * level blocks, and so on if necessary (of course we're talking huge
41  * amounts of data here).  The topmost indirect block of a given logical
42  * tape is never actually written out to the physical file, but all lower-
43  * level indirect blocks will be.
44  *
45  * The initial write pass is guaranteed to fill the underlying file
46  * perfectly sequentially, no matter how data is divided into logical tapes.
47  * Once we begin merge passes, the access pattern becomes considerably
48  * less predictable --- but the seeking involved should be comparable to
49  * what would happen if we kept each logical tape in a separate file,
50  * so there's no serious performance penalty paid to obtain the space
51  * savings of recycling.  We try to localize the write accesses by always
52  * writing to the lowest-numbered free block when we have a choice; it's
53  * not clear this helps much, but it can't hurt.  (XXX perhaps a LIFO
54  * policy for free blocks would be better?)
55  *
56  * Since all the bookkeeping and buffer memory is allocated with palloc(),
57  * and the underlying file(s) are made with OpenTemporaryFile, all resources
58  * for a logical tape set are certain to be cleaned up even if processing
59  * is aborted by ereport(ERROR).  To avoid confusion, the caller should take
60  * care that all calls for a single LogicalTapeSet are made in the same
61  * palloc context.
62  *
63  * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
64  * Portions Copyright (c) 1994, Regents of the University of California
65  *
66  * IDENTIFICATION
67  *        $PostgreSQL: pgsql/src/backend/utils/sort/logtape.c,v 1.17 2005/10/18 22:59:37 tgl Exp $
68  *
69  *-------------------------------------------------------------------------
70  */
71
72 #include "postgres.h"
73
74 #include "storage/buffile.h"
75 #include "utils/logtape.h"
76
77 /*
78  * Block indexes are "long"s, so we can fit this many per indirect block.
79  * NB: we assume this is an exact fit!
80  */
81 #define BLOCKS_PER_INDIR_BLOCK  ((int) (BLCKSZ / sizeof(long)))
82
83 /*
84  * We use a struct like this for each active indirection level of each
85  * logical tape.  If the indirect block is not the highest level of its
86  * tape, the "nextup" link points to the next higher level.  Only the
87  * "ptrs" array is written out if we have to dump the indirect block to
88  * disk.  If "ptrs" is not completely full, we store -1L in the first
89  * unused slot at completion of the write phase for the logical tape.
90  */
91 typedef struct IndirectBlock
92 {
93         int                     nextSlot;               /* next pointer slot to write or read */
94         struct IndirectBlock *nextup;           /* parent indirect level, or NULL if
95                                                                                  * top */
96         long            ptrs[BLOCKS_PER_INDIR_BLOCK];   /* indexes of contained blocks */
97 } IndirectBlock;
98
99 /*
100  * This data structure represents a single "logical tape" within the set
101  * of logical tapes stored in the same file.  We must keep track of the
102  * current partially-read-or-written data block as well as the active
103  * indirect block level(s).
104  */
105 typedef struct LogicalTape
106 {
107         IndirectBlock *indirect;        /* bottom of my indirect-block hierarchy */
108         bool            writing;                /* T while in write phase */
109         bool            frozen;                 /* T if blocks should not be freed when read */
110         bool            dirty;                  /* does buffer need to be written? */
111
112         /*
113          * The total data volume in the logical tape is numFullBlocks * BLCKSZ +
114          * lastBlockBytes.      BUT: we do not update lastBlockBytes during writing,
115          * only at completion of a write phase.
116          */
117         long            numFullBlocks;  /* number of complete blocks in log tape */
118         int                     lastBlockBytes; /* valid bytes in last (incomplete) block */
119
120         /*
121          * Buffer for current data block.  Note we don't bother to store the
122          * actual file block number of the data block (during the write phase it
123          * hasn't been assigned yet, and during read we don't care anymore). But
124          * we do need the relative block number so we can detect end-of-tape while
125          * reading.
126          */
127         long            curBlockNumber; /* this block's logical blk# within tape */
128         int                     pos;                    /* next read/write position in buffer */
129         int                     nbytes;                 /* total # of valid bytes in buffer */
130         char            buffer[BLCKSZ];
131 } LogicalTape;
132
133 /*
134  * This data structure represents a set of related "logical tapes" sharing
135  * space in a single underlying file.  (But that "file" may be multiple files
136  * if needed to escape OS limits on file size; buffile.c handles that for us.)
137  * The number of tapes is fixed at creation.
138  */
139 struct LogicalTapeSet
140 {
141         BufFile    *pfile;                      /* underlying file for whole tape set */
142         long            nFileBlocks;    /* # of blocks used in underlying file */
143
144         /*
145          * We store the numbers of recycled-and-available blocks in freeBlocks[].
146          * When there are no such blocks, we extend the underlying file.  Note
147          * that the block numbers in freeBlocks are always in *decreasing* order,
148          * so that removing the last entry gives us the lowest free block.
149          */
150         long       *freeBlocks;         /* resizable array */
151         int                     nFreeBlocks;    /* # of currently free blocks */
152         int                     freeBlocksLen;  /* current allocated length of freeBlocks[] */
153
154         /*
155          * tapes[] is declared size 1 since C wants a fixed size, but actually it
156          * is of length nTapes.
157          */
158         int                     nTapes;                 /* # of logical tapes in set */
159         LogicalTape *tapes[1];          /* must be last in struct! */
160 };
161
162 static void ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
163 static void ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
164 static long ltsGetFreeBlock(LogicalTapeSet *lts);
165 static void ltsReleaseBlock(LogicalTapeSet *lts, long blocknum);
166 static void ltsRecordBlockNum(LogicalTapeSet *lts, IndirectBlock *indirect,
167                                   long blocknum);
168 static long ltsRewindIndirectBlock(LogicalTapeSet *lts,
169                                            IndirectBlock *indirect,
170                                            bool freezing);
171 static long ltsRewindFrozenIndirectBlock(LogicalTapeSet *lts,
172                                                          IndirectBlock *indirect);
173 static long ltsRecallNextBlockNum(LogicalTapeSet *lts,
174                                           IndirectBlock *indirect,
175                                           bool frozen);
176 static long ltsRecallPrevBlockNum(LogicalTapeSet *lts,
177                                           IndirectBlock *indirect);
178 static void ltsDumpBuffer(LogicalTapeSet *lts, LogicalTape *lt);
179
180
181 /*
182  * Write a block-sized buffer to the specified block of the underlying file.
183  *
184  * NB: should not attempt to write beyond current end of file (ie, create
185  * "holes" in file), since BufFile doesn't allow that.  The first write pass
186  * must write blocks sequentially.
187  *
188  * No need for an error return convention; we ereport() on any error.
189  */
190 static void
191 ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
192 {
193         if (BufFileSeekBlock(lts->pfile, blocknum) != 0 ||
194                 BufFileWrite(lts->pfile, buffer, BLCKSZ) != BLCKSZ)
195                 ereport(ERROR,
196                 /* XXX is it okay to assume errno is correct? */
197                                 (errcode_for_file_access(),
198                                  errmsg("could not write block %ld of temporary file: %m",
199                                                 blocknum),
200                                  errhint("Perhaps out of disk space?")));
201 }
202
203 /*
204  * Read a block-sized buffer from the specified block of the underlying file.
205  *
206  * No need for an error return convention; we ereport() on any error.   This
207  * module should never attempt to read a block it doesn't know is there.
208  */
209 static void
210 ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
211 {
212         if (BufFileSeekBlock(lts->pfile, blocknum) != 0 ||
213                 BufFileRead(lts->pfile, buffer, BLCKSZ) != BLCKSZ)
214                 ereport(ERROR,
215                 /* XXX is it okay to assume errno is correct? */
216                                 (errcode_for_file_access(),
217                                  errmsg("could not read block %ld of temporary file: %m",
218                                                 blocknum)));
219 }
220
221 /*
222  * Select a currently unused block for writing to.
223  *
224  * NB: should only be called when writer is ready to write immediately,
225  * to ensure that first write pass is sequential.
226  */
227 static long
228 ltsGetFreeBlock(LogicalTapeSet *lts)
229 {
230         /*
231          * If there are multiple free blocks, we select the one appearing last in
232          * freeBlocks[].  If there are none, assign the next block at the end of
233          * the file.
234          */
235         if (lts->nFreeBlocks > 0)
236                 return lts->freeBlocks[--lts->nFreeBlocks];
237         else
238                 return lts->nFileBlocks++;
239 }
240
241 /*
242  * Return a block# to the freelist.
243  */
244 static void
245 ltsReleaseBlock(LogicalTapeSet *lts, long blocknum)
246 {
247         int                     ndx;
248         long       *ptr;
249
250         /*
251          * Enlarge freeBlocks array if full.
252          */
253         if (lts->nFreeBlocks >= lts->freeBlocksLen)
254         {
255                 lts->freeBlocksLen *= 2;
256                 lts->freeBlocks = (long *) repalloc(lts->freeBlocks,
257                                                                                   lts->freeBlocksLen * sizeof(long));
258         }
259
260         /*
261          * Insert blocknum into array, preserving decreasing order (so that
262          * ltsGetFreeBlock returns the lowest available block number). This could
263          * get fairly slow if there were many free blocks, but we don't expect
264          * there to be very many at one time.
265          */
266         ndx = lts->nFreeBlocks++;
267         ptr = lts->freeBlocks + ndx;
268         while (ndx > 0 && ptr[-1] < blocknum)
269         {
270                 ptr[0] = ptr[-1];
271                 ndx--, ptr--;
272         }
273         ptr[0] = blocknum;
274 }
275
276 /*
277  * These routines manipulate indirect-block hierarchies.  All are recursive
278  * so that they don't have any specific limit on the depth of hierarchy.
279  */
280
281 /*
282  * Record a data block number in a logical tape's lowest indirect block,
283  * or record an indirect block's number in the next higher indirect level.
284  */
285 static void
286 ltsRecordBlockNum(LogicalTapeSet *lts, IndirectBlock *indirect,
287                                   long blocknum)
288 {
289         if (indirect->nextSlot >= BLOCKS_PER_INDIR_BLOCK)
290         {
291                 /*
292                  * This indirect block is full, so dump it out and recursively save
293                  * its address in the next indirection level.  Create a new
294                  * indirection level if there wasn't one before.
295                  */
296                 long            indirblock = ltsGetFreeBlock(lts);
297
298                 ltsWriteBlock(lts, indirblock, (void *) indirect->ptrs);
299                 if (indirect->nextup == NULL)
300                 {
301                         indirect->nextup = (IndirectBlock *) palloc(sizeof(IndirectBlock));
302                         indirect->nextup->nextSlot = 0;
303                         indirect->nextup->nextup = NULL;
304                 }
305                 ltsRecordBlockNum(lts, indirect->nextup, indirblock);
306
307                 /*
308                  * Reset to fill another indirect block at this level.
309                  */
310                 indirect->nextSlot = 0;
311         }
312         indirect->ptrs[indirect->nextSlot++] = blocknum;
313 }
314
315 /*
316  * Reset a logical tape's indirect-block hierarchy after a write pass
317  * to prepare for reading.      We dump out partly-filled blocks except
318  * at the top of the hierarchy, and we rewind each level to the start.
319  * This call returns the first data block number, or -1L if the tape
320  * is empty.
321  *
322  * Unless 'freezing' is true, release indirect blocks to the free pool after
323  * reading them.
324  */
325 static long
326 ltsRewindIndirectBlock(LogicalTapeSet *lts,
327                                            IndirectBlock *indirect,
328                                            bool freezing)
329 {
330         /* Insert sentinel if block is not full */
331         if (indirect->nextSlot < BLOCKS_PER_INDIR_BLOCK)
332                 indirect->ptrs[indirect->nextSlot] = -1L;
333
334         /*
335          * If block is not topmost, write it out, and recurse to obtain address of
336          * first block in this hierarchy level.  Read that one in.
337          */
338         if (indirect->nextup != NULL)
339         {
340                 long            indirblock = ltsGetFreeBlock(lts);
341
342                 ltsWriteBlock(lts, indirblock, (void *) indirect->ptrs);
343                 ltsRecordBlockNum(lts, indirect->nextup, indirblock);
344                 indirblock = ltsRewindIndirectBlock(lts, indirect->nextup, freezing);
345                 Assert(indirblock != -1L);
346                 ltsReadBlock(lts, indirblock, (void *) indirect->ptrs);
347                 if (!freezing)
348                         ltsReleaseBlock(lts, indirblock);
349         }
350
351         /*
352          * Reset my next-block pointer, and then fetch a block number if any.
353          */
354         indirect->nextSlot = 0;
355         if (indirect->ptrs[0] == -1L)
356                 return -1L;
357         return indirect->ptrs[indirect->nextSlot++];
358 }
359
360 /*
361  * Rewind a previously-frozen indirect-block hierarchy for another read pass.
362  * This call returns the first data block number, or -1L if the tape
363  * is empty.
364  */
365 static long
366 ltsRewindFrozenIndirectBlock(LogicalTapeSet *lts,
367                                                          IndirectBlock *indirect)
368 {
369         /*
370          * If block is not topmost, recurse to obtain address of first block in
371          * this hierarchy level.  Read that one in.
372          */
373         if (indirect->nextup != NULL)
374         {
375                 long            indirblock;
376
377                 indirblock = ltsRewindFrozenIndirectBlock(lts, indirect->nextup);
378                 Assert(indirblock != -1L);
379                 ltsReadBlock(lts, indirblock, (void *) indirect->ptrs);
380         }
381
382         /*
383          * Reset my next-block pointer, and then fetch a block number if any.
384          */
385         indirect->nextSlot = 0;
386         if (indirect->ptrs[0] == -1L)
387                 return -1L;
388         return indirect->ptrs[indirect->nextSlot++];
389 }
390
391 /*
392  * Obtain next data block number in the forward direction, or -1L if no more.
393  *
394  * Unless 'frozen' is true, release indirect blocks to the free pool after
395  * reading them.
396  */
397 static long
398 ltsRecallNextBlockNum(LogicalTapeSet *lts,
399                                           IndirectBlock *indirect,
400                                           bool frozen)
401 {
402         if (indirect->nextSlot >= BLOCKS_PER_INDIR_BLOCK ||
403                 indirect->ptrs[indirect->nextSlot] == -1L)
404         {
405                 long            indirblock;
406
407                 if (indirect->nextup == NULL)
408                         return -1L;                     /* nothing left at this level */
409                 indirblock = ltsRecallNextBlockNum(lts, indirect->nextup, frozen);
410                 if (indirblock == -1L)
411                         return -1L;                     /* nothing left at this level */
412                 ltsReadBlock(lts, indirblock, (void *) indirect->ptrs);
413                 if (!frozen)
414                         ltsReleaseBlock(lts, indirblock);
415                 indirect->nextSlot = 0;
416         }
417         if (indirect->ptrs[indirect->nextSlot] == -1L)
418                 return -1L;
419         return indirect->ptrs[indirect->nextSlot++];
420 }
421
422 /*
423  * Obtain next data block number in the reverse direction, or -1L if no more.
424  *
425  * Note this fetches the block# before the one last returned, no matter which
426  * direction of call returned that one.  If we fail, no change in state.
427  *
428  * This routine can only be used in 'frozen' state, so there's no need to
429  * pass a parameter telling whether to release blocks ... we never do.
430  */
431 static long
432 ltsRecallPrevBlockNum(LogicalTapeSet *lts,
433                                           IndirectBlock *indirect)
434 {
435         if (indirect->nextSlot <= 1)
436         {
437                 long            indirblock;
438
439                 if (indirect->nextup == NULL)
440                         return -1L;                     /* nothing left at this level */
441                 indirblock = ltsRecallPrevBlockNum(lts, indirect->nextup);
442                 if (indirblock == -1L)
443                         return -1L;                     /* nothing left at this level */
444                 ltsReadBlock(lts, indirblock, (void *) indirect->ptrs);
445
446                 /*
447                  * The previous block would only have been written out if full, so we
448                  * need not search it for a -1 sentinel.
449                  */
450                 indirect->nextSlot = BLOCKS_PER_INDIR_BLOCK + 1;
451         }
452         indirect->nextSlot--;
453         return indirect->ptrs[indirect->nextSlot - 1];
454 }
455
456
457 /*
458  * Create a set of logical tapes in a temporary underlying file.
459  *
460  * Each tape is initialized in write state.
461  */
462 LogicalTapeSet *
463 LogicalTapeSetCreate(int ntapes)
464 {
465         LogicalTapeSet *lts;
466         LogicalTape *lt;
467         int                     i;
468
469         /*
470          * Create top-level struct.  First LogicalTape pointer is already counted
471          * in sizeof(LogicalTapeSet).
472          */
473         Assert(ntapes > 0);
474         lts = (LogicalTapeSet *) palloc(sizeof(LogicalTapeSet) +
475                                                                         (ntapes - 1) *sizeof(LogicalTape *));
476         lts->pfile = BufFileCreateTemp(false);
477         lts->nFileBlocks = 0L;
478         lts->freeBlocksLen = 32;        /* reasonable initial guess */
479         lts->freeBlocks = (long *) palloc(lts->freeBlocksLen * sizeof(long));
480         lts->nFreeBlocks = 0;
481         lts->nTapes = ntapes;
482
483         /*
484          * Create per-tape structs, including first-level indirect blocks.
485          */
486         for (i = 0; i < ntapes; i++)
487         {
488                 lt = (LogicalTape *) palloc(sizeof(LogicalTape));
489                 lts->tapes[i] = lt;
490                 lt->indirect = (IndirectBlock *) palloc(sizeof(IndirectBlock));
491                 lt->indirect->nextSlot = 0;
492                 lt->indirect->nextup = NULL;
493                 lt->writing = true;
494                 lt->frozen = false;
495                 lt->dirty = false;
496                 lt->numFullBlocks = 0L;
497                 lt->lastBlockBytes = 0;
498                 lt->curBlockNumber = 0L;
499                 lt->pos = 0;
500                 lt->nbytes = 0;
501         }
502         return lts;
503 }
504
505 /*
506  * Close a logical tape set and release all resources.
507  */
508 void
509 LogicalTapeSetClose(LogicalTapeSet *lts)
510 {
511         LogicalTape *lt;
512         IndirectBlock *ib,
513                            *nextib;
514         int                     i;
515
516         BufFileClose(lts->pfile);
517         for (i = 0; i < lts->nTapes; i++)
518         {
519                 lt = lts->tapes[i];
520                 for (ib = lt->indirect; ib != NULL; ib = nextib)
521                 {
522                         nextib = ib->nextup;
523                         pfree(ib);
524                 }
525                 pfree(lt);
526         }
527         pfree(lts->freeBlocks);
528         pfree(lts);
529 }
530
531 /*
532  * Dump the dirty buffer of a logical tape.
533  */
534 static void
535 ltsDumpBuffer(LogicalTapeSet *lts, LogicalTape *lt)
536 {
537         long            datablock = ltsGetFreeBlock(lts);
538
539         Assert(lt->dirty);
540         ltsWriteBlock(lts, datablock, (void *) lt->buffer);
541         ltsRecordBlockNum(lts, lt->indirect, datablock);
542         lt->dirty = false;
543         /* Caller must do other state update as needed */
544 }
545
546 /*
547  * Write to a logical tape.
548  *
549  * There are no error returns; we ereport() on failure.
550  */
551 void
552 LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
553                                  void *ptr, size_t size)
554 {
555         LogicalTape *lt;
556         size_t          nthistime;
557
558         Assert(tapenum >= 0 && tapenum < lts->nTapes);
559         lt = lts->tapes[tapenum];
560         Assert(lt->writing);
561
562         while (size > 0)
563         {
564                 if (lt->pos >= BLCKSZ)
565                 {
566                         /* Buffer full, dump it out */
567                         if (lt->dirty)
568                                 ltsDumpBuffer(lts, lt);
569                         else
570                         {
571                                 /* Hmm, went directly from reading to writing? */
572                                 elog(ERROR, "invalid logtape state: should be dirty");
573                         }
574                         lt->numFullBlocks++;
575                         lt->curBlockNumber++;
576                         lt->pos = 0;
577                         lt->nbytes = 0;
578                 }
579
580                 nthistime = BLCKSZ - lt->pos;
581                 if (nthistime > size)
582                         nthistime = size;
583                 Assert(nthistime > 0);
584
585                 memcpy(lt->buffer + lt->pos, ptr, nthistime);
586
587                 lt->dirty = true;
588                 lt->pos += nthistime;
589                 if (lt->nbytes < lt->pos)
590                         lt->nbytes = lt->pos;
591                 ptr = (void *) ((char *) ptr + nthistime);
592                 size -= nthistime;
593         }
594 }
595
596 /*
597  * Rewind logical tape and switch from writing to reading or vice versa.
598  *
599  * Unless the tape has been "frozen" in read state, forWrite must be the
600  * opposite of the previous tape state.
601  */
602 void
603 LogicalTapeRewind(LogicalTapeSet *lts, int tapenum, bool forWrite)
604 {
605         LogicalTape *lt;
606         long            datablocknum;
607
608         Assert(tapenum >= 0 && tapenum < lts->nTapes);
609         lt = lts->tapes[tapenum];
610
611         if (!forWrite)
612         {
613                 if (lt->writing)
614                 {
615                         /*
616                          * Completion of a write phase.  Flush last partial data block,
617                          * flush any partial indirect blocks, rewind for normal
618                          * (destructive) read.
619                          */
620                         if (lt->dirty)
621                                 ltsDumpBuffer(lts, lt);
622                         lt->lastBlockBytes = lt->nbytes;
623                         lt->writing = false;
624                         datablocknum = ltsRewindIndirectBlock(lts, lt->indirect, false);
625                 }
626                 else
627                 {
628                         /*
629                          * This is only OK if tape is frozen; we rewind for (another) read
630                          * pass.
631                          */
632                         Assert(lt->frozen);
633                         datablocknum = ltsRewindFrozenIndirectBlock(lts, lt->indirect);
634                 }
635                 /* Read the first block, or reset if tape is empty */
636                 lt->curBlockNumber = 0L;
637                 lt->pos = 0;
638                 lt->nbytes = 0;
639                 if (datablocknum != -1L)
640                 {
641                         ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
642                         if (!lt->frozen)
643                                 ltsReleaseBlock(lts, datablocknum);
644                         lt->nbytes = (lt->curBlockNumber < lt->numFullBlocks) ?
645                                 BLCKSZ : lt->lastBlockBytes;
646                 }
647         }
648         else
649         {
650                 /*
651                  * Completion of a read phase.  Rewind and prepare for write.
652                  *
653                  * NOTE: we assume the caller has read the tape to the end; otherwise
654                  * untouched data and indirect blocks will not have been freed. We
655                  * could add more code to free any unread blocks, but in current usage
656                  * of this module it'd be useless code.
657                  */
658                 IndirectBlock *ib,
659                                    *nextib;
660
661                 Assert(!lt->writing && !lt->frozen);
662                 /* Must truncate the indirect-block hierarchy down to one level. */
663                 for (ib = lt->indirect->nextup; ib != NULL; ib = nextib)
664                 {
665                         nextib = ib->nextup;
666                         pfree(ib);
667                 }
668                 lt->indirect->nextSlot = 0;
669                 lt->indirect->nextup = NULL;
670                 lt->writing = true;
671                 lt->dirty = false;
672                 lt->numFullBlocks = 0L;
673                 lt->lastBlockBytes = 0;
674                 lt->curBlockNumber = 0L;
675                 lt->pos = 0;
676                 lt->nbytes = 0;
677         }
678 }
679
680 /*
681  * Read from a logical tape.
682  *
683  * Early EOF is indicated by return value less than #bytes requested.
684  */
685 size_t
686 LogicalTapeRead(LogicalTapeSet *lts, int tapenum,
687                                 void *ptr, size_t size)
688 {
689         LogicalTape *lt;
690         size_t          nread = 0;
691         size_t          nthistime;
692
693         Assert(tapenum >= 0 && tapenum < lts->nTapes);
694         lt = lts->tapes[tapenum];
695         Assert(!lt->writing);
696
697         while (size > 0)
698         {
699                 if (lt->pos >= lt->nbytes)
700                 {
701                         /* Try to load more data into buffer. */
702                         long            datablocknum = ltsRecallNextBlockNum(lts, lt->indirect,
703                                                                                                                          lt->frozen);
704
705                         if (datablocknum == -1L)
706                                 break;                  /* EOF */
707                         lt->curBlockNumber++;
708                         lt->pos = 0;
709                         ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
710                         if (!lt->frozen)
711                                 ltsReleaseBlock(lts, datablocknum);
712                         lt->nbytes = (lt->curBlockNumber < lt->numFullBlocks) ?
713                                 BLCKSZ : lt->lastBlockBytes;
714                         if (lt->nbytes <= 0)
715                                 break;                  /* EOF (possible here?) */
716                 }
717
718                 nthistime = lt->nbytes - lt->pos;
719                 if (nthistime > size)
720                         nthistime = size;
721                 Assert(nthistime > 0);
722
723                 memcpy(ptr, lt->buffer + lt->pos, nthistime);
724
725                 lt->pos += nthistime;
726                 ptr = (void *) ((char *) ptr + nthistime);
727                 size -= nthistime;
728                 nread += nthistime;
729         }
730
731         return nread;
732 }
733
734 /*
735  * "Freeze" the contents of a tape so that it can be read multiple times
736  * and/or read backwards.  Once a tape is frozen, its contents will not
737  * be released until the LogicalTapeSet is destroyed.  This is expected
738  * to be used only for the final output pass of a merge.
739  *
740  * This *must* be called just at the end of a write pass, before the
741  * tape is rewound (after rewind is too late!).  It performs a rewind
742  * and switch to read mode "for free".  An immediately following rewind-
743  * for-read call is OK but not necessary.
744  */
745 void
746 LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum)
747 {
748         LogicalTape *lt;
749         long            datablocknum;
750
751         Assert(tapenum >= 0 && tapenum < lts->nTapes);
752         lt = lts->tapes[tapenum];
753         Assert(lt->writing);
754
755         /*
756          * Completion of a write phase.  Flush last partial data block, flush any
757          * partial indirect blocks, rewind for nondestructive read.
758          */
759         if (lt->dirty)
760                 ltsDumpBuffer(lts, lt);
761         lt->lastBlockBytes = lt->nbytes;
762         lt->writing = false;
763         lt->frozen = true;
764         datablocknum = ltsRewindIndirectBlock(lts, lt->indirect, true);
765         /* Read the first block, or reset if tape is empty */
766         lt->curBlockNumber = 0L;
767         lt->pos = 0;
768         lt->nbytes = 0;
769         if (datablocknum != -1L)
770         {
771                 ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
772                 lt->nbytes = (lt->curBlockNumber < lt->numFullBlocks) ?
773                         BLCKSZ : lt->lastBlockBytes;
774         }
775 }
776
777 /*
778  * Backspace the tape a given number of bytes.  (We also support a more
779  * general seek interface, see below.)
780  *
781  * *Only* a frozen-for-read tape can be backed up; we don't support
782  * random access during write, and an unfrozen read tape may have
783  * already discarded the desired data!
784  *
785  * Return value is TRUE if seek successful, FALSE if there isn't that much
786  * data before the current point (in which case there's no state change).
787  */
788 bool
789 LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size)
790 {
791         LogicalTape *lt;
792         long            nblocks;
793         int                     newpos;
794
795         Assert(tapenum >= 0 && tapenum < lts->nTapes);
796         lt = lts->tapes[tapenum];
797         Assert(lt->frozen);
798
799         /*
800          * Easy case for seek within current block.
801          */
802         if (size <= (size_t) lt->pos)
803         {
804                 lt->pos -= (int) size;
805                 return true;
806         }
807
808         /*
809          * Not-so-easy case.  Figure out whether it's possible at all.
810          */
811         size -= (size_t) lt->pos;       /* part within this block */
812         nblocks = size / BLCKSZ;
813         size = size % BLCKSZ;
814         if (size)
815         {
816                 nblocks++;
817                 newpos = (int) (BLCKSZ - size);
818         }
819         else
820                 newpos = 0;
821         if (nblocks > lt->curBlockNumber)
822                 return false;                   /* a seek too far... */
823
824         /*
825          * OK, we need to back up nblocks blocks.  This implementation would be
826          * pretty inefficient for long seeks, but we really aren't expecting that
827          * (a seek over one tuple is typical).
828          */
829         while (nblocks-- > 0)
830         {
831                 long            datablocknum = ltsRecallPrevBlockNum(lts, lt->indirect);
832
833                 if (datablocknum == -1L)
834                         elog(ERROR, "unexpected end of tape");
835                 lt->curBlockNumber--;
836                 if (nblocks == 0)
837                 {
838                         ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
839                         lt->nbytes = BLCKSZ;
840                 }
841         }
842         lt->pos = newpos;
843         return true;
844 }
845
846 /*
847  * Seek to an arbitrary position in a logical tape.
848  *
849  * *Only* a frozen-for-read tape can be seeked.
850  *
851  * Return value is TRUE if seek successful, FALSE if there isn't that much
852  * data in the tape (in which case there's no state change).
853  */
854 bool
855 LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
856                                 long blocknum, int offset)
857 {
858         LogicalTape *lt;
859
860         Assert(tapenum >= 0 && tapenum < lts->nTapes);
861         lt = lts->tapes[tapenum];
862         Assert(lt->frozen);
863         Assert(offset >= 0 && offset <= BLCKSZ);
864
865         /*
866          * Easy case for seek within current block.
867          */
868         if (blocknum == lt->curBlockNumber && offset <= lt->nbytes)
869         {
870                 lt->pos = offset;
871                 return true;
872         }
873
874         /*
875          * Not-so-easy case.  Figure out whether it's possible at all.
876          */
877         if (blocknum < 0 || blocknum > lt->numFullBlocks ||
878                 (blocknum == lt->numFullBlocks && offset > lt->lastBlockBytes))
879                 return false;
880
881         /*
882          * OK, advance or back up to the target block.  This implementation would
883          * be pretty inefficient for long seeks, but we really aren't expecting
884          * that (a seek over one tuple is typical).
885          */
886         while (lt->curBlockNumber > blocknum)
887         {
888                 long            datablocknum = ltsRecallPrevBlockNum(lts, lt->indirect);
889
890                 if (datablocknum == -1L)
891                         elog(ERROR, "unexpected end of tape");
892                 if (--lt->curBlockNumber == blocknum)
893                         ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
894         }
895         while (lt->curBlockNumber < blocknum)
896         {
897                 long            datablocknum = ltsRecallNextBlockNum(lts, lt->indirect,
898                                                                                                                  lt->frozen);
899
900                 if (datablocknum == -1L)
901                         elog(ERROR, "unexpected end of tape");
902                 if (++lt->curBlockNumber == blocknum)
903                         ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
904         }
905         lt->nbytes = (lt->curBlockNumber < lt->numFullBlocks) ?
906                 BLCKSZ : lt->lastBlockBytes;
907         lt->pos = offset;
908         return true;
909 }
910
911 /*
912  * Obtain current position in a form suitable for a later LogicalTapeSeek.
913  *
914  * NOTE: it'd be OK to do this during write phase with intention of using
915  * the position for a seek after freezing.      Not clear if anyone needs that.
916  */
917 void
918 LogicalTapeTell(LogicalTapeSet *lts, int tapenum,
919                                 long *blocknum, int *offset)
920 {
921         LogicalTape *lt;
922
923         Assert(tapenum >= 0 && tapenum < lts->nTapes);
924         lt = lts->tapes[tapenum];
925         *blocknum = lt->curBlockNumber;
926         *offset = lt->pos;
927 }
928
929 /*
930  * Obtain total disk space currently used by a LogicalTapeSet, in blocks.
931  */
932 long
933 LogicalTapeSetBlocks(LogicalTapeSet *lts)
934 {
935         return lts->nFileBlocks;
936 }