* Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/storage/file/buffile.c,v 1.1 1999/10/13 15:02:29 tgl Exp $
+ * $Header: /cvsroot/pgsql/src/backend/storage/file/buffile.c,v 1.2 1999/10/16 19:49:26 tgl Exp $
*
* NOTES:
*
*
* BufFile also supports temporary files that exceed the OS file size limit
* (by opening multiple fd.c temporary files). This is an essential feature
- * for sorts and hashjoins on large amounts of data. It is possible to have
- * more than one BufFile reading/writing the same temp file, although the
- * caller is responsible for avoiding ill effects from buffer overlap when
- * this is done.
+ * for sorts and hashjoins on large amounts of data.
*-------------------------------------------------------------------------
*/
#define MAX_PHYSICAL_FILESIZE (RELSEG_SIZE * BLCKSZ)
/*
- * To handle multiple BufFiles on a single logical temp file, we use this
- * data structure representing a logical file (which can be made up of
- * multiple physical files to get around the OS file size limit).
+ * This data structure represents a buffered file that consists of one or
+ * more physical files (each accessed through a virtual file descriptor
+ * managed by fd.c).
*/
-typedef struct LogicalFile
+struct BufFile
{
- int refCount; /* number of BufFiles using me */
- bool isTemp; /* can only add files if this is TRUE */
int numFiles; /* number of physical files in set */
/* all files except the last have length exactly MAX_PHYSICAL_FILESIZE */
-
File *files; /* palloc'd array with numFiles entries */
long *offsets; /* palloc'd array with numFiles entries */
/* offsets[i] is the current seek position of files[i]. We use this
* to avoid making redundant FileSeek calls.
*/
-} LogicalFile;
-/*
- * A single file buffer looks like this.
- */
-struct BufFile
-{
- LogicalFile *logFile; /* the underlying LogicalFile */
+ bool isTemp; /* can only add files if this is TRUE */
bool dirty; /* does buffer need to be written? */
/*
- * "current pos" is position of start of buffer within LogicalFile.
+ * "current pos" is position of start of buffer within the logical file.
* Position as seen by user of BufFile is (curFile, curOffset + pos).
*/
int curFile; /* file index (0..n) part of current pos */
char buffer[BLCKSZ];
};
-static LogicalFile *makeLogicalFile(File firstfile);
-static void extendLogicalFile(LogicalFile *file);
-static void deleteLogicalFile(LogicalFile *file);
+static BufFile *makeBufFile(File firstfile);
+static void extendBufFile(BufFile *file);
static void BufFileLoadBuffer(BufFile *file);
static void BufFileDumpBuffer(BufFile *file);
static int BufFileFlush(BufFile *file);
/*
- * Create a LogicalFile with one component file and refcount 1.
+ * Create a BufFile given the first underlying physical file.
* NOTE: caller must set isTemp true if appropriate.
*/
-static LogicalFile *
-makeLogicalFile(File firstfile)
+static BufFile *
+makeBufFile(File firstfile)
{
- LogicalFile *file = (LogicalFile *) palloc(sizeof(LogicalFile));
+ BufFile *file = (BufFile *) palloc(sizeof(BufFile));
- file->refCount = 1;
- file->isTemp = false;
file->numFiles = 1;
file->files = (File *) palloc(sizeof(File));
file->files[0] = firstfile;
file->offsets = (long *) palloc(sizeof(long));
file->offsets[0] = 0L;
+ file->isTemp = false;
+ file->dirty = false;
+ file->curFile = 0;
+ file->curOffset = 0L;
+ file->pos = 0;
+ file->nbytes = 0;
return file;
}
* Add another component temp file.
*/
static void
-extendLogicalFile(LogicalFile *file)
+extendBufFile(BufFile *file)
{
File pfile;
file->numFiles++;
}
-/*
- * Close and delete a LogicalFile when its refCount has gone to zero.
- */
-static void
-deleteLogicalFile(LogicalFile *file)
-{
- int i;
-
- for (i = 0; i < file->numFiles; i++)
- FileClose(file->files[i]);
- pfree(file->files);
- pfree(file->offsets);
- pfree(file);
-}
-
/*
* Create a BufFile for a new temporary file (which will expand to become
* multiple temporary files if more than MAX_PHYSICAL_FILESIZE bytes are
BufFile *
BufFileCreateTemp(void)
{
- BufFile *bfile = (BufFile *) palloc(sizeof(BufFile));
+ BufFile *file;
File pfile;
- LogicalFile *lfile;
pfile = OpenTemporaryFile();
Assert(pfile >= 0);
- lfile = makeLogicalFile(pfile);
- lfile->isTemp = true;
-
- bfile->logFile = lfile;
- bfile->dirty = false;
- bfile->curFile = 0;
- bfile->curOffset = 0L;
- bfile->pos = 0;
- bfile->nbytes = 0;
+ file = makeBufFile(pfile);
+ file->isTemp = true;
- return bfile;
+ return file;
}
/*
BufFile *
BufFileCreate(File file)
{
- BufFile *bfile = (BufFile *) palloc(sizeof(BufFile));
- LogicalFile *lfile;
-
- lfile = makeLogicalFile(file);
-
- bfile->logFile = lfile;
- bfile->dirty = false;
- bfile->curFile = 0;
- bfile->curOffset = 0L;
- bfile->pos = 0;
- bfile->nbytes = 0;
-
- return bfile;
-}
-
-/*
- * Create an additional BufFile accessing the same underlying file as an
- * existing BufFile. This is useful for having multiple read/write access
- * positions in a single temporary file. Note the caller is responsible
- * for avoiding trouble due to overlapping buffer positions! (Caller may
- * assume that buffer size is BLCKSZ...)
- */
-BufFile *
-BufFileReaccess(BufFile *file)
-{
- BufFile *bfile = (BufFile *) palloc(sizeof(BufFile));
-
- bfile->logFile = file->logFile;
- bfile->logFile->refCount++;
- bfile->dirty = false;
- bfile->curFile = 0;
- bfile->curOffset = 0L;
- bfile->pos = 0;
- bfile->nbytes = 0;
-
- return bfile;
+ return makeBufFile(file);
}
/*
void
BufFileClose(BufFile *file)
{
+ int i;
+
/* flush any unwritten data */
BufFileFlush(file);
- /* close the underlying (with delete if it's a temp file) */
- if (--(file->logFile->refCount) <= 0)
- deleteLogicalFile(file->logFile);
+ /* close the underlying file(s) (with delete if it's a temp file) */
+ for (i = 0; i < file->numFiles; i++)
+ FileClose(file->files[i]);
/* release the buffer space */
+ pfree(file->files);
+ pfree(file->offsets);
pfree(file);
}
-/* BufFileLoadBuffer
+/*
+ * BufFileLoadBuffer
*
* Load some data into buffer, if possible, starting from curOffset.
* At call, must have dirty = false, pos and nbytes = 0.
static void
BufFileLoadBuffer(BufFile *file)
{
- LogicalFile *lfile = file->logFile;
File thisfile;
/*
* MAX_PHYSICAL_FILESIZE.
*/
if (file->curOffset >= MAX_PHYSICAL_FILESIZE &&
- file->curFile+1 < lfile->numFiles)
+ file->curFile+1 < file->numFiles)
{
file->curFile++;
file->curOffset = 0L;
}
- thisfile = lfile->files[file->curFile];
/*
- * May need to reposition physical file, if more than one BufFile
- * is using it.
+ * May need to reposition physical file.
*/
- if (file->curOffset != lfile->offsets[file->curFile])
+ thisfile = file->files[file->curFile];
+ if (file->curOffset != file->offsets[file->curFile])
{
if (FileSeek(thisfile, file->curOffset, SEEK_SET) != file->curOffset)
return; /* seek failed, read nothing */
- lfile->offsets[file->curFile] = file->curOffset;
+ file->offsets[file->curFile] = file->curOffset;
}
+ /*
+ * Read whatever we can get, up to a full bufferload.
+ */
file->nbytes = FileRead(thisfile, file->buffer, sizeof(file->buffer));
if (file->nbytes < 0)
file->nbytes = 0;
- lfile->offsets[file->curFile] += file->nbytes;
+ file->offsets[file->curFile] += file->nbytes;
/* we choose not to advance curOffset here */
}
-/* BufFileDumpBuffer
+/*
+ * BufFileDumpBuffer
*
* Dump buffer contents starting at curOffset.
* At call, should have dirty = true, nbytes > 0.
static void
BufFileDumpBuffer(BufFile *file)
{
- LogicalFile *lfile = file->logFile;
int wpos = 0;
int bytestowrite;
File thisfile;
/*
* Advance to next component file if necessary and possible.
*/
- if (file->curOffset >= MAX_PHYSICAL_FILESIZE && lfile->isTemp)
+ if (file->curOffset >= MAX_PHYSICAL_FILESIZE && file->isTemp)
{
- while (file->curFile+1 >= lfile->numFiles)
- extendLogicalFile(lfile);
+ while (file->curFile+1 >= file->numFiles)
+ extendBufFile(file);
file->curFile++;
file->curOffset = 0L;
}
* to write as much as asked...
*/
bytestowrite = file->nbytes - wpos;
- if (lfile->isTemp)
+ if (file->isTemp)
{
long availbytes = MAX_PHYSICAL_FILESIZE - file->curOffset;
if ((long) bytestowrite > availbytes)
bytestowrite = (int) availbytes;
}
- thisfile = lfile->files[file->curFile];
/*
- * May need to reposition physical file, if more than one BufFile
- * is using it.
+ * May need to reposition physical file.
*/
- if (file->curOffset != lfile->offsets[file->curFile])
+ thisfile = file->files[file->curFile];
+ if (file->curOffset != file->offsets[file->curFile])
{
if (FileSeek(thisfile, file->curOffset, SEEK_SET) != file->curOffset)
return; /* seek failed, give up */
- lfile->offsets[file->curFile] = file->curOffset;
+ file->offsets[file->curFile] = file->curOffset;
}
bytestowrite = FileWrite(thisfile, file->buffer, bytestowrite);
if (bytestowrite <= 0)
return; /* failed to write */
- lfile->offsets[file->curFile] += bytestowrite;
+ file->offsets[file->curFile] += bytestowrite;
file->curOffset += bytestowrite;
wpos += bytestowrite;
}
file->nbytes = 0;
}
-/* BufFileRead
+/*
+ * BufFileRead
*
* Like fread() except we assume 1-byte element size.
*/
return nread;
}
-/* BufFileWrite
+/*
+ * BufFileWrite
*
* Like fwrite() except we assume 1-byte element size.
*/
return nwritten;
}
-/* BufFileFlush
+/*
+ * BufFileFlush
*
* Like fflush()
*/
return 0;
}
-/* BufFileSeek
+/*
+ * BufFileSeek
*
- * Like fseek(). Result is 0 if OK, EOF if not.
+ * Like fseek(), except that target position needs two values in order to
+ * work when logical filesize exceeds maximum value representable by long.
+ * We do not support relative seeks across more than LONG_MAX, however.
+ *
+ * Result is 0 if OK, EOF if not. Logical position is not moved if an
+ * impossible seek is attempted.
*/
int
BufFileSeek(BufFile *file, int fileno, long offset, int whence)
switch (whence)
{
case SEEK_SET:
- if (fileno < 0 || fileno >= file->logFile->numFiles ||
+ if (fileno < 0 || fileno >= file->numFiles ||
offset < 0)
return EOF;
newFile = fileno;
return EOF;
newOffset += MAX_PHYSICAL_FILESIZE;
}
- if (file->logFile->isTemp)
+ if (file->isTemp)
{
while (newOffset > MAX_PHYSICAL_FILESIZE)
{
- if (++newFile >= file->logFile->numFiles)
+ if (++newFile >= file->numFiles)
return EOF;
newOffset -= MAX_PHYSICAL_FILESIZE;
}
return 0;
}
-extern void
+void
BufFileTell(BufFile *file, int *fileno, long *offset)
{
*fileno = file->curFile;
*offset = file->curOffset + file->pos;
}
+
+/*
+ * BufFileSeekBlock --- block-oriented seek
+ *
+ * Performs absolute seek to the start of the n'th BLCKSZ-sized block of
+ * the file. Note that users of this interface will fail if their files
+ * exceed BLCKSZ * LONG_MAX bytes, but that is quite a lot; we don't work
+ * with tables bigger than that, either...
+ *
+ * Result is 0 if OK, EOF if not. Logical position is not moved if an
+ * impossible seek is attempted.
+ */
+int
+BufFileSeekBlock(BufFile *file, long blknum)
+{
+ return BufFileSeek(file,
+ (int) (blknum / RELSEG_SIZE),
+ (blknum % RELSEG_SIZE) * BLCKSZ,
+ SEEK_SET);
+}
+
+/*
+ * BufFileTellBlock --- block-oriented tell
+ *
+ * Any fractional part of a block in the current seek position is ignored.
+ */
+long
+BufFileTellBlock(BufFile *file)
+{
+ long blknum;
+
+ blknum = (file->curOffset + file->pos) / BLCKSZ;
+ blknum += file->curFile * RELSEG_SIZE;
+ return blknum;
+}
# Makefile for utils/sort
#
# IDENTIFICATION
-# $Header: /cvsroot/pgsql/src/backend/utils/sort/Makefile,v 1.5 1998/04/06 00:27:37 momjian Exp $
+# $Header: /cvsroot/pgsql/src/backend/utils/sort/Makefile,v 1.6 1999/10/16 19:49:27 tgl Exp $
#
#-------------------------------------------------------------------------
CFLAGS += -I../..
-OBJS = lselect.o psort.o
+OBJS = logtape.o lselect.o psort.o
all: SUBSYS.o
--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * logtape.c
+ * Management of "logical tapes" within temporary files.
+ *
+ * This module exists to support sorting via multiple merge passes (see
+ * psort.c). Merging is an ideal algorithm for tape devices, but if we
+ * implement it on disk by creating a separate file for each "tape",
+ * there is an annoying problem: the peak space usage is at least twice
+ * the volume of actual data to be sorted. (This must be so because each
+ * datum will appear in both the input and output tapes of the final
+ * merge pass. For seven-tape polyphase merge, which is otherwise a
+ * pretty good algorithm, peak usage is more like 4x actual data volume.)
+ *
+ * We can work around this problem by recognizing that any one tape
+ * dataset (with the possible exception of the final output) is written
+ * and read exactly once in a perfectly sequential manner. Therefore,
+ * a datum once read will not be required again, and we can recycle its
+ * space for use by the new tape dataset(s) being generated. In this way,
+ * the total space usage is essentially just the actual data volume, plus
+ * insignificant bookkeeping and start/stop overhead.
+ *
+ * Few OSes allow arbitrary parts of a file to be released back to the OS,
+ * so we have to implement this space-recycling ourselves within a single
+ * logical file. logtape.c exists to perform this bookkeeping and provide
+ * the illusion of N independent tape devices to psort.c. Note that
+ * logtape.c itself depends on buffile.c to provide a "logical file" of
+ * larger size than the underlying OS may support.
+ *
+ * For simplicity, we allocate and release space in the underlying file
+ * in BLCKSZ-size blocks. Space allocation boils down to keeping track
+ * of which blocks in the underlying file belong to which logical tape,
+ * plus any blocks that are free (recycled and not yet reused). Normally
+ * there are not very many free blocks, so we just keep those in a list.
+ * The blocks in each logical tape are remembered using a method borrowed
+ * from the Unix HFS filesystem: we store data block numbers in an
+ * "indirect block". If an indirect block fills up, we write it out to
+ * the underlying file and remember its location in a second-level indirect
+ * block. In the same way second-level blocks are remembered in third-
+ * level blocks, and so on if necessary (of course we're talking huge
+ * amounts of data here). The topmost indirect block of a given logical
+ * tape is never actually written out to the physical file, but all lower-
+ * level indirect blocks will be.
+ *
+ * The initial write pass is guaranteed to fill the underlying file
+ * perfectly sequentially, no matter how data is divided into logical tapes.
+ * Once we begin merge passes, the access pattern becomes considerably
+ * less predictable --- but the seeking involved should be comparable to
+ * what would happen if we kept each logical tape in a separate file,
+ * so there's no serious performance penalty paid to obtain the space
+ * savings of recycling. We try to localize the write accesses by always
+ * writing to the lowest-numbered free block when we have a choice; it's
+ * not clear this helps much, but it can't hurt. (XXX perhaps a LIFO
+ * policy for free blocks would be better?)
+ *
+ * Since all the bookkeeping and buffer memory is allocated with palloc(),
+ * and the underlying file(s) are made with OpenTemporaryFile, all resources
+ * for a logical tape set are certain to be cleaned up even if processing
+ * is aborted by elog(ERROR). To avoid confusion, the caller should take
+ * care that all calls for a single LogicalTapeSet are made in the same
+ * palloc context.
+ *
+ * Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * $Header: /cvsroot/pgsql/src/backend/utils/sort/logtape.c,v 1.1 1999/10/16 19:49:27 tgl Exp $
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "storage/buffile.h"
+#include "utils/logtape.h"
+
+/*
+ * Block indexes are "long"s, so we can fit this many per indirect block.
+ * NB: we assume this is an exact fit!
+ */
+#define BLOCKS_PER_INDIR_BLOCK (BLCKSZ / sizeof(long))
+
+/*
+ * We use a struct like this for each active indirection level of each
+ * logical tape. If the indirect block is not the highest level of its
+ * tape, the "nextup" link points to the next higher level. Only the
+ * "ptrs" array is written out if we have to dump the indirect block to
+ * disk. If "ptrs" is not completely full, we store -1L in the first
+ * unused slot at completion of the write phase for the logical tape.
+ */
+typedef struct IndirectBlock
+{
+ int nextSlot; /* next pointer slot to write or read */
+ struct IndirectBlock *nextup; /* parent indirect level, or NULL if top */
+ long ptrs[BLOCKS_PER_INDIR_BLOCK]; /* indexes of contained blocks */
+} IndirectBlock;
+
+/*
+ * This data structure represents a single "logical tape" within the set
+ * of logical tapes stored in the same file. We must keep track of the
+ * current partially-read-or-written data block as well as the active
+ * indirect block level(s).
+ */
+typedef struct LogicalTape
+{
+ IndirectBlock *indirect; /* bottom of my indirect-block hierarchy */
+ bool writing; /* T while in write phase */
+ bool frozen; /* T if blocks should not be freed when read */
+ bool dirty; /* does buffer need to be written? */
+ /*
+ * The total data volume in the logical tape is numFullBlocks * BLCKSZ
+ * + lastBlockBytes. BUT: we do not update lastBlockBytes during writing,
+ * only at completion of a write phase.
+ */
+ long numFullBlocks; /* number of complete blocks in log tape */
+ int lastBlockBytes; /* valid bytes in last (incomplete) block */
+ /*
+ * Buffer for current data block. Note we don't bother to store the
+ * actual file block number of the data block (during the write phase
+ * it hasn't been assigned yet, and during read we don't care anymore).
+ * But we do need the relative block number so we can detect end-of-tape
+ * while reading.
+ */
+ long curBlockNumber; /* this block's logical blk# within tape */
+ int pos; /* next read/write position in buffer */
+ int nbytes; /* total # of valid bytes in buffer */
+ char buffer[BLCKSZ];
+} LogicalTape;
+
+/*
+ * This data structure represents a set of related "logical tapes" sharing
+ * space in a single underlying file. (But that "file" may be multiple files
+ * if needed to escape OS limits on file size; buffile.c handles that for us.)
+ * The number of tapes is fixed at creation.
+ */
+struct LogicalTapeSet
+{
+ BufFile *pfile; /* underlying file for whole tape set */
+ long nFileBlocks; /* # of blocks used in underlying file */
+ /*
+ * We store the numbers of recycled-and-available blocks in freeBlocks[].
+ * When there are no such blocks, we extend the underlying file. Note
+ * that the block numbers in freeBlocks are always in *decreasing* order,
+ * so that removing the last entry gives us the lowest free block.
+ */
+ long *freeBlocks; /* resizable array */
+ int nFreeBlocks; /* # of currently free blocks */
+ int freeBlocksLen; /* current allocated length of freeBlocks[] */
+ /*
+ * tapes[] is declared size 1 since C wants a fixed size, but actually
+ * it is of length nTapes.
+ */
+ int nTapes; /* # of logical tapes in set */
+ LogicalTape *tapes[1]; /* must be last in struct! */
+};
+
+static void ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
+static void ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
+static long ltsGetFreeBlock(LogicalTapeSet *lts);
+static void ltsReleaseBlock(LogicalTapeSet *lts, long blocknum);
+static void ltsRecordBlockNum(LogicalTapeSet *lts, IndirectBlock *indirect,
+ long blocknum);
+static long ltsRewindIndirectBlock(LogicalTapeSet *lts,
+ IndirectBlock *indirect,
+ bool freezing);
+static long ltsRewindFrozenIndirectBlock(LogicalTapeSet *lts,
+ IndirectBlock *indirect);
+static long ltsRecallNextBlockNum(LogicalTapeSet *lts,
+ IndirectBlock *indirect,
+ bool frozen);
+static long ltsRecallPrevBlockNum(LogicalTapeSet *lts,
+ IndirectBlock *indirect);
+static void ltsDumpBuffer(LogicalTapeSet *lts, LogicalTape *lt);
+
+
+/*
+ * Write a block-sized buffer to the specified block of the underlying file.
+ *
+ * NB: should not attempt to write beyond current end of file (ie, create
+ * "holes" in file), since BufFile doesn't allow that. The first write pass
+ * must write blocks sequentially.
+ *
+ * No need for an error return convention; we elog() on any error.
+ */
+static void
+ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
+{
+ if (BufFileSeekBlock(lts->pfile, blocknum) != 0 ||
+ BufFileWrite(lts->pfile, buffer, BLCKSZ) != BLCKSZ)
+ elog(ERROR, "ltsWriteBlock: failed to write block %ld of temporary file\n\t\tPerhaps out of disk space?",
+ blocknum);
+}
+
+/*
+ * Read a block-sized buffer from the specified block of the underlying file.
+ *
+ * No need for an error return convention; we elog() on any error. This
+ * module should never attempt to read a block it doesn't know is there.
+ */
+static void
+ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
+{
+ if (BufFileSeekBlock(lts->pfile, blocknum) != 0 ||
+ BufFileRead(lts->pfile, buffer, BLCKSZ) != BLCKSZ)
+ elog(ERROR, "ltsReadBlock: failed to read block %ld of temporary file",
+ blocknum);
+}
+
+/*
+ * Select a currently unused block for writing to.
+ *
+ * NB: should only be called when writer is ready to write immediately,
+ * to ensure that first write pass is sequential.
+ */
+static long
+ltsGetFreeBlock(LogicalTapeSet *lts)
+{
+ /* If there are multiple free blocks, we select the one appearing last
+ * in freeBlocks[]. If there are none, assign the next block at the end
+ * of the file.
+ */
+ if (lts->nFreeBlocks > 0)
+ return lts->freeBlocks[--lts->nFreeBlocks];
+ else
+ return lts->nFileBlocks++;
+}
+
+/*
+ * Return a block# to the freelist.
+ */
+static void
+ltsReleaseBlock(LogicalTapeSet *lts, long blocknum)
+{
+ int ndx;
+ long *ptr;
+
+ /*
+ * Enlarge freeBlocks array if full.
+ */
+ if (lts->nFreeBlocks >= lts->freeBlocksLen)
+ {
+ lts->freeBlocksLen *= 2;
+ lts->freeBlocks = (long *) repalloc(lts->freeBlocks,
+ lts->freeBlocksLen * sizeof(long));
+ }
+ /*
+ * Insert blocknum into array, preserving decreasing order (so that
+ * ltsGetFreeBlock returns the lowest available block number).
+ * This could get fairly slow if there were many free blocks, but
+ * we don't expect there to be very many at one time.
+ */
+ ndx = lts->nFreeBlocks++;
+ ptr = lts->freeBlocks + ndx;
+ while (ndx > 0 && ptr[-1] < blocknum)
+ {
+ ptr[0] = ptr[-1];
+ ndx--, ptr--;
+ }
+ ptr[0] = blocknum;
+}
+
+/*
+ * These routines manipulate indirect-block hierarchies. All are recursive
+ * so that they don't have any specific limit on the depth of hierarchy.
+ */
+
+/*
+ * Record a data block number in a logical tape's lowest indirect block,
+ * or record an indirect block's number in the next higher indirect level.
+ */
+static void
+ltsRecordBlockNum(LogicalTapeSet *lts, IndirectBlock *indirect,
+ long blocknum)
+{
+ if (indirect->nextSlot >= BLOCKS_PER_INDIR_BLOCK)
+ {
+ /*
+ * This indirect block is full, so dump it out and recursively
+ * save its address in the next indirection level. Create a
+ * new indirection level if there wasn't one before.
+ */
+ long indirblock = ltsGetFreeBlock(lts);
+
+ ltsWriteBlock(lts, indirblock, (void *) indirect->ptrs);
+ if (indirect->nextup == NULL)
+ {
+ indirect->nextup = (IndirectBlock *) palloc(sizeof(IndirectBlock));
+ indirect->nextup->nextSlot = 0;
+ indirect->nextup->nextup = NULL;
+ }
+ ltsRecordBlockNum(lts, indirect->nextup, indirblock);
+ /*
+ * Reset to fill another indirect block at this level.
+ */
+ indirect->nextSlot = 0;
+ }
+ indirect->ptrs[indirect->nextSlot++] = blocknum;
+}
+
+/*
+ * Reset a logical tape's indirect-block hierarchy after a write pass
+ * to prepare for reading. We dump out partly-filled blocks except
+ * at the top of the hierarchy, and we rewind each level to the start.
+ * This call returns the first data block number, or -1L if the tape
+ * is empty.
+ *
+ * Unless 'freezing' is true, release indirect blocks to the free pool after
+ * reading them.
+ */
+static long
+ltsRewindIndirectBlock(LogicalTapeSet *lts,
+ IndirectBlock *indirect,
+ bool freezing)
+{
+ /* Insert sentinel if block is not full */
+ if (indirect->nextSlot < BLOCKS_PER_INDIR_BLOCK)
+ indirect->ptrs[indirect->nextSlot] = -1L;
+ /*
+ * If block is not topmost, write it out, and recurse to obtain
+ * address of first block in this hierarchy level. Read that one in.
+ */
+ if (indirect->nextup != NULL)
+ {
+ long indirblock = ltsGetFreeBlock(lts);
+
+ ltsWriteBlock(lts, indirblock, (void *) indirect->ptrs);
+ ltsRecordBlockNum(lts, indirect->nextup, indirblock);
+ indirblock = ltsRewindIndirectBlock(lts, indirect->nextup, freezing);
+ Assert(indirblock != -1L);
+ ltsReadBlock(lts, indirblock, (void *) indirect->ptrs);
+ if (! freezing)
+ ltsReleaseBlock(lts, indirblock);
+ }
+ /*
+ * Reset my next-block pointer, and then fetch a block number if any.
+ */
+ indirect->nextSlot = 0;
+ if (indirect->ptrs[0] == -1L)
+ return -1L;
+ return indirect->ptrs[indirect->nextSlot++];
+}
+
+/*
+ * Rewind a previously-frozen indirect-block hierarchy for another read pass.
+ * This call returns the first data block number, or -1L if the tape
+ * is empty.
+ */
+static long
+ltsRewindFrozenIndirectBlock(LogicalTapeSet *lts,
+ IndirectBlock *indirect)
+{
+ /*
+ * If block is not topmost, recurse to obtain
+ * address of first block in this hierarchy level. Read that one in.
+ */
+ if (indirect->nextup != NULL)
+ {
+ long indirblock;
+
+ indirblock = ltsRewindFrozenIndirectBlock(lts, indirect->nextup);
+ Assert(indirblock != -1L);
+ ltsReadBlock(lts, indirblock, (void *) indirect->ptrs);
+ }
+ /*
+ * Reset my next-block pointer, and then fetch a block number if any.
+ */
+ indirect->nextSlot = 0;
+ if (indirect->ptrs[0] == -1L)
+ return -1L;
+ return indirect->ptrs[indirect->nextSlot++];
+}
+
+/*
+ * Obtain next data block number in the forward direction, or -1L if no more.
+ *
+ * Unless 'frozen' is true, release indirect blocks to the free pool after
+ * reading them.
+ */
+static long
+ltsRecallNextBlockNum(LogicalTapeSet *lts,
+ IndirectBlock *indirect,
+ bool frozen)
+{
+ if (indirect->nextSlot >= BLOCKS_PER_INDIR_BLOCK ||
+ indirect->ptrs[indirect->nextSlot] == -1L)
+ {
+ long indirblock;
+
+ if (indirect->nextup == NULL)
+ return -1L; /* nothing left at this level */
+ indirblock = ltsRecallNextBlockNum(lts, indirect->nextup, frozen);
+ if (indirblock == -1L)
+ return -1L; /* nothing left at this level */
+ ltsReadBlock(lts, indirblock, (void *) indirect->ptrs);
+ if (! frozen)
+ ltsReleaseBlock(lts, indirblock);
+ indirect->nextSlot = 0;
+ }
+ if (indirect->ptrs[indirect->nextSlot] == -1L)
+ return -1L;
+ return indirect->ptrs[indirect->nextSlot++];
+}
+
+/*
+ * Obtain next data block number in the reverse direction, or -1L if no more.
+ *
+ * Note this fetches the block# before the one last returned, no matter which
+ * direction of call returned that one. If we fail, no change in state.
+ *
+ * This routine can only be used in 'frozen' state, so there's no need to
+ * pass a parameter telling whether to release blocks ... we never do.
+ */
+static long
+ltsRecallPrevBlockNum(LogicalTapeSet *lts,
+ IndirectBlock *indirect)
+{
+ if (indirect->nextSlot <= 1)
+ {
+ long indirblock;
+
+ if (indirect->nextup == NULL)
+ return -1L; /* nothing left at this level */
+ indirblock = ltsRecallPrevBlockNum(lts, indirect->nextup);
+ if (indirblock == -1L)
+ return -1L; /* nothing left at this level */
+ ltsReadBlock(lts, indirblock, (void *) indirect->ptrs);
+ /* The previous block would only have been written out if full,
+ * so we need not search it for a -1 sentinel.
+ */
+ indirect->nextSlot = BLOCKS_PER_INDIR_BLOCK+1;
+ }
+ indirect->nextSlot--;
+ return indirect->ptrs[indirect->nextSlot-1];
+}
+
+
+/*
+ * Create a set of logical tapes in a temporary underlying file.
+ *
+ * Each tape is initialized in write state.
+ */
+LogicalTapeSet *
+LogicalTapeSetCreate(int ntapes)
+{
+ LogicalTapeSet *lts;
+ LogicalTape *lt;
+ int i;
+
+ /*
+ * Create top-level struct. First LogicalTape pointer is already
+ * counted in sizeof(LogicalTapeSet).
+ */
+ Assert(ntapes > 0);
+ lts = (LogicalTapeSet *) palloc(sizeof(LogicalTapeSet) +
+ (ntapes-1) * sizeof(LogicalTape *));
+ lts->pfile = BufFileCreateTemp();
+ lts->nFileBlocks = 0L;
+ lts->freeBlocksLen = 32; /* reasonable initial guess */
+ lts->freeBlocks = (long *) palloc(lts->freeBlocksLen * sizeof(long));
+ lts->nFreeBlocks = 0;
+ lts->nTapes = ntapes;
+ /*
+ * Create per-tape structs, including first-level indirect blocks.
+ */
+ for (i = 0; i < ntapes; i++)
+ {
+ lt = (LogicalTape *) palloc(sizeof(LogicalTape));
+ lts->tapes[i] = lt;
+ lt->indirect = (IndirectBlock *) palloc(sizeof(IndirectBlock));
+ lt->indirect->nextSlot = 0;
+ lt->indirect->nextup = NULL;
+ lt->writing = true;
+ lt->frozen = false;
+ lt->dirty = false;
+ lt->numFullBlocks = 0L;
+ lt->lastBlockBytes = 0;
+ lt->curBlockNumber = 0L;
+ lt->pos = 0;
+ lt->nbytes = 0;
+ }
+ return lts;
+}
+
+/*
+ * Close a logical tape set and release all resources.
+ */
+void LogicalTapeSetClose(LogicalTapeSet *lts)
+{
+ LogicalTape *lt;
+ IndirectBlock *ib,
+ *nextib;
+ int i;
+
+ BufFileClose(lts->pfile);
+ for (i = 0; i < lts->nTapes; i++)
+ {
+ lt = lts->tapes[i];
+ for (ib = lt->indirect; ib != NULL; ib = nextib)
+ {
+ nextib = ib->nextup;
+ pfree(ib);
+ }
+ pfree(lt);
+ }
+ pfree(lts->freeBlocks);
+ pfree(lts);
+}
+
+/*
+ * Dump the dirty buffer of a logical tape.
+ */
+static void
+ltsDumpBuffer(LogicalTapeSet *lts, LogicalTape *lt)
+{
+ long datablock = ltsGetFreeBlock(lts);
+
+ Assert(lt->dirty);
+ ltsWriteBlock(lts, datablock, (void *) lt->buffer);
+ ltsRecordBlockNum(lts, lt->indirect, datablock);
+ lt->dirty = false;
+ /* Caller must do other state update as needed */
+}
+
+/*
+ * Write to a logical tape.
+ *
+ * There are no error returns; we elog() on failure.
+ */
+void
+LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
+ void *ptr, size_t size)
+{
+ LogicalTape *lt;
+ size_t nthistime;
+
+ Assert(tapenum >= 0 && tapenum < lts->nTapes);
+ lt = lts->tapes[tapenum];
+ Assert(lt->writing);
+
+ while (size > 0)
+ {
+ if (lt->pos >= BLCKSZ)
+ {
+ /* Buffer full, dump it out */
+ if (lt->dirty)
+ {
+ ltsDumpBuffer(lts, lt);
+ }
+ else
+ {
+ /* Hmm, went directly from reading to writing? */
+ elog(ERROR, "LogicalTapeWrite: impossible state");
+ }
+ lt->numFullBlocks++;
+ lt->curBlockNumber++;
+ lt->pos = 0;
+ lt->nbytes = 0;
+ }
+
+ nthistime = BLCKSZ - lt->pos;
+ if (nthistime > size)
+ nthistime = size;
+ Assert(nthistime > 0);
+
+ memcpy(lt->buffer + lt->pos, ptr, nthistime);
+
+ lt->dirty = true;
+ lt->pos += nthistime;
+ if (lt->nbytes < lt->pos)
+ lt->nbytes = lt->pos;
+ ptr = (void *) ((char *) ptr + nthistime);
+ size -= nthistime;
+ }
+}
+
+/*
+ * Rewind logical tape and switch from writing to reading or vice versa.
+ *
+ * Unless the tape has been "frozen" in read state, forWrite must be the
+ * opposite of the previous tape state.
+ */
+void
+LogicalTapeRewind(LogicalTapeSet *lts, int tapenum, bool forWrite)
+{
+ LogicalTape *lt;
+ long datablocknum;
+
+ Assert(tapenum >= 0 && tapenum < lts->nTapes);
+ lt = lts->tapes[tapenum];
+
+ if (! forWrite)
+ {
+ if (lt->writing)
+ {
+ /*
+ * Completion of a write phase. Flush last partial data
+ * block, flush any partial indirect blocks, rewind for
+ * normal (destructive) read.
+ */
+ if (lt->dirty)
+ ltsDumpBuffer(lts, lt);
+ lt->lastBlockBytes = lt->nbytes;
+ lt->writing = false;
+ datablocknum = ltsRewindIndirectBlock(lts, lt->indirect, false);
+ }
+ else
+ {
+ /*
+ * This is only OK if tape is frozen; we rewind for (another)
+ * read pass.
+ */
+ Assert(lt->frozen);
+ datablocknum = ltsRewindFrozenIndirectBlock(lts, lt->indirect);
+ }
+ /* Read the first block, or reset if tape is empty */
+ lt->curBlockNumber = 0L;
+ lt->pos = 0;
+ lt->nbytes = 0;
+ if (datablocknum != -1L)
+ {
+ ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
+ if (! lt->frozen)
+ ltsReleaseBlock(lts, datablocknum);
+ lt->nbytes = (lt->curBlockNumber < lt->numFullBlocks) ?
+ BLCKSZ : lt->lastBlockBytes;
+ }
+ }
+ else
+ {
+ /*
+ * Completion of a read phase. Rewind and prepare for write.
+ *
+ * NOTE: we assume the caller has read the tape to the end;
+ * otherwise untouched data and indirect blocks will not have
+ * been freed. We could add more code to free any unread blocks,
+ * but in current usage of this module it'd be useless code.
+ */
+ IndirectBlock *ib,
+ *nextib;
+
+ Assert(! lt->writing && ! lt->frozen);
+ /* Must truncate the indirect-block hierarchy down to one level. */
+ for (ib = lt->indirect->nextup; ib != NULL; ib = nextib)
+ {
+ nextib = ib->nextup;
+ pfree(ib);
+ }
+ lt->indirect->nextSlot = 0;
+ lt->indirect->nextup = NULL;
+ lt->writing = true;
+ lt->dirty = false;
+ lt->numFullBlocks = 0L;
+ lt->lastBlockBytes = 0;
+ lt->curBlockNumber = 0L;
+ lt->pos = 0;
+ lt->nbytes = 0;
+ }
+}
+
+/*
+ * Read from a logical tape.
+ *
+ * Early EOF is indicated by return value less than #bytes requested.
+ */
+size_t
+LogicalTapeRead(LogicalTapeSet *lts, int tapenum,
+ void *ptr, size_t size)
+{
+ LogicalTape *lt;
+ size_t nread = 0;
+ size_t nthistime;
+
+ Assert(tapenum >= 0 && tapenum < lts->nTapes);
+ lt = lts->tapes[tapenum];
+ Assert(! lt->writing);
+
+ while (size > 0)
+ {
+ if (lt->pos >= lt->nbytes)
+ {
+ /* Try to load more data into buffer. */
+ long datablocknum = ltsRecallNextBlockNum(lts, lt->indirect,
+ lt->frozen);
+
+ if (datablocknum == -1L)
+ break; /* EOF */
+ lt->curBlockNumber++;
+ lt->pos = 0;
+ ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
+ if (! lt->frozen)
+ ltsReleaseBlock(lts, datablocknum);
+ lt->nbytes = (lt->curBlockNumber < lt->numFullBlocks) ?
+ BLCKSZ : lt->lastBlockBytes;
+ if (lt->nbytes <= 0)
+ break; /* EOF (possible here?) */
+ }
+
+ nthistime = lt->nbytes - lt->pos;
+ if (nthistime > size)
+ nthistime = size;
+ Assert(nthistime > 0);
+
+ memcpy(ptr, lt->buffer + lt->pos, nthistime);
+
+ lt->pos += nthistime;
+ ptr = (void *) ((char *) ptr + nthistime);
+ size -= nthistime;
+ nread += nthistime;
+ }
+
+ return nread;
+}
+
+/*
+ * "Freeze" the contents of a tape so that it can be read multiple times
+ * and/or read backwards. Once a tape is frozen, its contents will not
+ * be released until the LogicalTapeSet is destroyed. This is expected
+ * to be used only for the final output pass of a merge.
+ *
+ * This *must* be called just at the end of a write pass, before the
+ * tape is rewound (after rewind is too late!). It performs a rewind
+ * and switch to read mode "for free". An immediately following rewind-
+ * for-read call is OK but not necessary.
+ */
+void
+LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum)
+{
+ LogicalTape *lt;
+ long datablocknum;
+
+ Assert(tapenum >= 0 && tapenum < lts->nTapes);
+ lt = lts->tapes[tapenum];
+ Assert(lt->writing);
+
+ /*
+ * Completion of a write phase. Flush last partial data
+ * block, flush any partial indirect blocks, rewind for
+ * nondestructive read.
+ */
+ if (lt->dirty)
+ ltsDumpBuffer(lts, lt);
+ lt->lastBlockBytes = lt->nbytes;
+ lt->writing = false;
+ lt->frozen = true;
+ datablocknum = ltsRewindIndirectBlock(lts, lt->indirect, true);
+ /* Read the first block, or reset if tape is empty */
+ lt->curBlockNumber = 0L;
+ lt->pos = 0;
+ lt->nbytes = 0;
+ if (datablocknum != -1L)
+ {
+ ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
+ lt->nbytes = (lt->curBlockNumber < lt->numFullBlocks) ?
+ BLCKSZ : lt->lastBlockBytes;
+ }
+}
+
+/*
+ * Backspace the tape a given number of bytes. (We also support a more
+ * general seek interface, see below.)
+ *
+ * *Only* a frozen-for-read tape can be backed up; we don't support
+ * random access during write, and an unfrozen read tape may have
+ * already discarded the desired data!
+ *
+ * Return value is TRUE if seek successful, FALSE if there isn't that much
+ * data before the current point (in which case there's no state change).
+ */
+bool
+LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size)
+{
+ LogicalTape *lt;
+ long nblocks;
+ int newpos;
+
+ Assert(tapenum >= 0 && tapenum < lts->nTapes);
+ lt = lts->tapes[tapenum];
+ Assert(lt->frozen);
+
+ /*
+ * Easy case for seek within current block.
+ */
+ if (size <= (size_t) lt->pos)
+ {
+ lt->pos -= (int) size;
+ return true;
+ }
+ /*
+ * Not-so-easy case. Figure out whether it's possible at all.
+ */
+ size -= (size_t) lt->pos; /* part within this block */
+ nblocks = size / BLCKSZ;
+ size = size % BLCKSZ;
+ if (size)
+ {
+ nblocks++;
+ newpos = (int) (BLCKSZ - size);
+ }
+ else
+ newpos = 0;
+ if (nblocks > lt->curBlockNumber)
+ return false; /* a seek too far... */
+ /*
+ * OK, we need to back up nblocks blocks. This implementation
+ * would be pretty inefficient for long seeks, but we really
+ * aren't expecting that (a seek over one tuple is typical).
+ */
+ while (nblocks-- > 0)
+ {
+ long datablocknum = ltsRecallPrevBlockNum(lts, lt->indirect);
+
+ if (datablocknum == -1L)
+ elog(ERROR, "LogicalTapeBackspace: unexpected end of tape");
+ lt->curBlockNumber--;
+ if (nblocks == 0)
+ {
+ ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
+ lt->nbytes = BLCKSZ;
+ }
+ }
+ lt->pos = newpos;
+ return true;
+}
+
+/*
+ * Seek to an arbitrary position in a logical tape.
+ *
+ * *Only* a frozen-for-read tape can be seeked.
+ *
+ * Return value is TRUE if seek successful, FALSE if there isn't that much
+ * data in the tape (in which case there's no state change).
+ */
+bool
+LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
+ long blocknum, int offset)
+{
+ LogicalTape *lt;
+
+ Assert(tapenum >= 0 && tapenum < lts->nTapes);
+ lt = lts->tapes[tapenum];
+ Assert(lt->frozen);
+ Assert(offset >= 0 && offset <= BLCKSZ);
+
+ /*
+ * Easy case for seek within current block.
+ */
+ if (blocknum == lt->curBlockNumber && offset <= lt->nbytes)
+ {
+ lt->pos = offset;
+ return true;
+ }
+ /*
+ * Not-so-easy case. Figure out whether it's possible at all.
+ */
+ if (blocknum < 0 || blocknum > lt->numFullBlocks ||
+ (blocknum == lt->numFullBlocks && offset > lt->lastBlockBytes))
+ return false;
+ /*
+ * OK, advance or back up to the target block. This implementation
+ * would be pretty inefficient for long seeks, but we really
+ * aren't expecting that (a seek over one tuple is typical).
+ */
+ while (lt->curBlockNumber > blocknum)
+ {
+ long datablocknum = ltsRecallPrevBlockNum(lts, lt->indirect);
+
+ if (datablocknum == -1L)
+ elog(ERROR, "LogicalTapeSeek: unexpected end of tape");
+ if (--lt->curBlockNumber == blocknum)
+ ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
+ }
+ while (lt->curBlockNumber < blocknum)
+ {
+ long datablocknum = ltsRecallNextBlockNum(lts, lt->indirect,
+ lt->frozen);
+
+ if (datablocknum == -1L)
+ elog(ERROR, "LogicalTapeSeek: unexpected end of tape");
+ if (++lt->curBlockNumber == blocknum)
+ ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
+ }
+ lt->nbytes = (lt->curBlockNumber < lt->numFullBlocks) ?
+ BLCKSZ : lt->lastBlockBytes;
+ lt->pos = offset;
+ return true;
+}
+
+/*
+ * Obtain current position in a form suitable for a later LogicalTapeSeek.
+ *
+ * NOTE: it'd be OK to do this during write phase with intention of using
+ * the position for a seek after freezing. Not clear if anyone needs that.
+ */
+void
+LogicalTapeTell(LogicalTapeSet *lts, int tapenum,
+ long *blocknum, int *offset)
+{
+ LogicalTape *lt;
+
+ Assert(tapenum >= 0 && tapenum < lts->nTapes);
+ lt = lts->tapes[tapenum];
+ *blocknum = lt->curBlockNumber;
+ *offset = lt->pos;
+}
-/*
+/*-------------------------------------------------------------------------
+ *
* psort.c
* Polyphase merge sort.
*
- * Copyright (c) 1994, Regents of the University of California
- *
- * $Id: psort.c,v 1.57 1999/10/13 15:02:31 tgl Exp $
+ * See Knuth, volume 3, for more than you want to know about this algorithm.
*
* NOTES
- * Sorts the first relation into the second relation.
*
- * The old psort.c's routines formed a temporary relation from the merged
- * sort files. This version keeps the files around instead of generating the
- * relation from them, and provides interface functions to the file so that
- * you can grab tuples, mark a position in the file, restore a position in the
- * file. You must now explicitly call an interface function to end the sort,
- * psort_end, when you are done.
- * Now most of the global variables are stuck in the Sort nodes, and
- * accessed from there (they are passed to all the psort routines) so that
- * each sort running has its own separate state. This is facilitated by having
- * the Sort nodes passed in to all the interface functions.
- * The one global variable that all the sorts still share is SortMemory.
- * You should now be allowed to run two or more psorts concurrently,
- * so long as the memory they eat up is not greater than SORTMEM, the initial
- * value of SortMemory. -Rex 2.15.1995
+ * This needs to be generalized to handle index tuples as well as heap tuples,
+ * so that the near-duplicate code in nbtsort.c can be eliminated. Also,
+ * I think it's got memory leak problems.
*
- * Use the tape-splitting method (Knuth, Vol. III, pp281-86) in the future.
+ * Copyright (c) 1994, Regents of the University of California
*
- * Arguments? Variables?
- * MAXMERGE, MAXTAPES
+ * IDENTIFICATION
+ * $Header: /cvsroot/pgsql/src/backend/utils/sort/Attic/psort.c,v 1.58 1999/10/16 19:49:27 tgl Exp $
*
+ *-------------------------------------------------------------------------
*/
+
#include <math.h>
-#include <sys/types.h>
-#include <unistd.h>
#include "postgres.h"
#include "access/heapam.h"
+#include "access/relscan.h"
#include "executor/execdebug.h"
#include "executor/executor.h"
#include "miscadmin.h"
+#include "utils/logtape.h"
+#include "utils/lselect.h"
#include "utils/psort.h"
+#define MAXTAPES 7 /* See Knuth Fig. 70, p273 */
+
+struct tape
+{
+ int tp_dummy; /* (D) */
+ int tp_fib; /* (A) */
+ int tp_tapenum; /* (TAPE) */
+ struct tape *tp_prev;
+};
+
+/*
+ * Private state of a Psort operation. The "psortstate" field in a Sort node
+ * points to one of these. This replaces a lot of global variables that used
+ * to be here...
+ */
+typedef struct Psortstate
+{
+ LeftistContextData treeContext;
+
+ int TapeRange; /* number of tapes less 1 (T) */
+ int Level; /* Knuth's l */
+ int TotalDummy; /* sum of tp_dummy across all tapes */
+ struct tape Tape[MAXTAPES];
+
+ LogicalTapeSet *tapeset; /* logtape.c object for tapes in a temp file */
+
+ int BytesRead; /* I/O statistics (useless) */
+ int BytesWritten;
+ int tupcount;
+
+ struct leftist *Tuples; /* current tuple tree */
+
+ int psort_grab_tape; /* tape number of finished output data */
+ long psort_current; /* array index (only used if not tape) */
+ /* psort_saved(_offset) holds marked position for mark and restore */
+ long psort_saved; /* could be tape block#, or array index */
+ int psort_saved_offset; /* lower bits of psort_saved, if tape */
+ bool using_tape_files;
+ bool all_fetched; /* this is for cursors */
+
+ HeapTuple *memtuples;
+} Psortstate;
+
+/*
+ * PS - Macro to access and cast psortstate from a Sort node
+ */
+#define PS(N) ((Psortstate *)(N)->psortstate)
+
static bool createfirstrun(Sort *node);
-static bool createrun(Sort *node, BufFile *file);
-static void destroytape(BufFile *file);
-static void dumptuples(BufFile *file, Sort *node);
-static BufFile *gettape(void);
+static bool createrun(Sort *node, int desttapenum);
+static void dumptuples(Sort *node, int desttapenum);
static void initialrun(Sort *node);
static void inittapes(Sort *node);
static void merge(Sort *node, struct tape * dest);
-static BufFile *mergeruns(Sort *node);
+static int mergeruns(Sort *node);
static int _psort_cmp(HeapTuple *ltup, HeapTuple *rtup);
-
-/*
- * tlenzero used to delimit runs; both vars below must have
- * the same size as HeapTuple->t_len
- */
-static unsigned int tlenzero = 0;
-static unsigned int tlendummy;
-
/* these are used by _psort_cmp, and are set just before calling qsort() */
static TupleDesc PsortTupDesc;
static ScanKey PsortKeys;
static int PsortNkeys;
/*
- * old psort global variables
- *
- * (These are the global variables from the old psort. They are still used,
- * but are now accessed from Sort nodes using the PS macro. Note that while
- * these variables will be accessed by PS(node)->whatever, they will still
- * be called by their original names within the comments! -Rex 2.10.1995)
+ * tlenzero is used to write a zero to delimit runs, tlendummy is used
+ * to read in length words that we don't care about.
*
- * LeftistContextData treeContext;
- *
- * static int TapeRange; number of tapes - 1 (T)
- * static int Level; (l)
- * static int TotalDummy; summation of tp_dummy
- * static struct tape *Tape;
- *
- * static int BytesRead; to keep track of # of IO
- * static int BytesWritten;
- *
- * struct leftist *Tuples; current tuples in memory
- *
- * BufFile *psort_grab_file; this holds tuples grabbed
- * from merged sort runs
- * long psort_current; current file position
- * long psort_saved; file position saved for
- * mark and restore
+ * both vars must have the same size as HeapTuple->t_len
*/
+static unsigned int tlenzero = 0;
+static unsigned int tlendummy;
-/*
- * PS - Macro to access and cast psortstate from a Sort node
- */
-#define PS(N) ((Psortstate *)N->psortstate)
/*
- * psort_begin - polyphase merge sort entry point. Sorts the subplan
- * into a temporary file psort_grab_file. After
- * this is called, calling the interface function
- * psort_grabtuple iteratively will get you the sorted
- * tuples. psort_end then finishes the sort off, after
- * all the tuples have been grabbed.
+ * psort_begin
*
- * Allocates and initializes sort node's psort state.
+ * polyphase merge sort entry point. Sorts the subplan
+ * into memory or a temporary file. After
+ * this is called, calling the interface function
+ * psort_grabtuple iteratively will get you the sorted
+ * tuples. psort_end releases storage when done.
+ *
+ * Allocates and initializes sort node's psort state.
*/
bool
psort_begin(Sort *node, int nkeys, ScanKey key)
{
-
- node->psortstate = (struct Psortstate *) palloc(sizeof(struct Psortstate));
-
AssertArg(nkeys >= 1);
AssertArg(key[0].sk_attno != 0);
AssertArg(key[0].sk_procedure != 0);
- PS(node)->BytesRead = 0;
- PS(node)->BytesWritten = 0;
+ node->psortstate = (void *) palloc(sizeof(struct Psortstate));
+
PS(node)->treeContext.tupDesc = ExecGetTupType(outerPlan((Plan *) node));
PS(node)->treeContext.nKeys = nkeys;
PS(node)->treeContext.scanKeys = key;
PS(node)->treeContext.sortMem = SortMem * 1024;
- PS(node)->Tuples = NULL;
+ PS(node)->tapeset = NULL;
+
+ PS(node)->BytesRead = 0;
+ PS(node)->BytesWritten = 0;
PS(node)->tupcount = 0;
+ PS(node)->Tuples = NULL;
+
PS(node)->using_tape_files = false;
PS(node)->all_fetched = false;
- PS(node)->psort_grab_file = NULL;
+ PS(node)->psort_grab_tape = -1;
+
PS(node)->memtuples = NULL;
initialrun(node);
if (PS(node)->tupcount == 0)
return false;
- if (PS(node)->using_tape_files && PS(node)->psort_grab_file == NULL)
- PS(node)->psort_grab_file = mergeruns(node);
+ if (PS(node)->using_tape_files && PS(node)->psort_grab_tape == -1)
+ PS(node)->psort_grab_tape = mergeruns(node);
- PS(node)->psort_current = 0;
- PS(node)->psort_saved_fileno = 0;
+ PS(node)->psort_current = 0L;
PS(node)->psort_saved = 0L;
+ PS(node)->psort_saved_offset = 0;
return true;
}
/*
* inittapes - initializes the tapes
* - (polyphase merge Alg.D(D1)--Knuth, Vol.3, p.270)
- * Returns:
- * number of allocated tapes
+ *
+ * This is called only if we have found we don't have room to sort in memory.
*/
static void
inittapes(Sort *node)
Assert(node != (Sort *) NULL);
Assert(PS(node) != (Psortstate *) NULL);
- /*
- * ASSERT(ntapes >= 3 && ntapes <= MAXTAPES, "inittapes: Invalid
- * number of tapes to initialize.\n");
- */
+ PS(node)->tapeset = LogicalTapeSetCreate(MAXTAPES);
tp = PS(node)->Tape;
- for (i = 0; i < MAXTAPES && (tp->tp_file = gettape()) != NULL; i++)
+ for (i = 0; i < MAXTAPES; i++)
{
tp->tp_dummy = 1;
tp->tp_fib = 1;
+ tp->tp_tapenum = i;
tp->tp_prev = tp - 1;
tp++;
}
tp->tp_fib = 0;
PS(node)->Tape[0].tp_prev = tp;
- if (PS(node)->TapeRange <= 1)
- elog(ERROR, "inittapes: Could only allocate %d < 3 tapes\n",
- PS(node)->TapeRange + 1);
-
PS(node)->Level = 1;
PS(node)->TotalDummy = PS(node)->TapeRange;
/*
* PUTTUP - writes the next tuple
* ENDRUN - mark end of run
- * GETLEN - reads the length of the next tuple
+ * TRYGETLEN - reads the length of the next tuple, if any
+ * GETLEN - reads the length of the next tuple, must be one
* ALLOCTUP - returns space for the new tuple
- * SETTUPLEN - stores the length into the tuple
* GETTUP - reads the tuple
*
* Note:
*/
-#define PUTTUP(NODE, TUP, FP) \
+#define PUTTUP(NODE, TUP, TAPE) \
( \
(TUP)->t_len += HEAPTUPLESIZE, \
- ((Psortstate *)NODE->psortstate)->BytesWritten += (TUP)->t_len, \
- BufFileWrite(FP, (char *)TUP, (TUP)->t_len), \
- BufFileWrite(FP, (char *)&((TUP)->t_len), sizeof(tlendummy)), \
+ PS(NODE)->BytesWritten += (TUP)->t_len, \
+ LogicalTapeWrite(PS(NODE)->tapeset, (TAPE), (void*)(TUP), (TUP)->t_len), \
+ LogicalTapeWrite(PS(NODE)->tapeset, (TAPE), (void*)&((TUP)->t_len), sizeof(tlendummy)), \
(TUP)->t_len -= HEAPTUPLESIZE \
)
-#define ENDRUN(FP) BufFileWrite(FP, (char *)&tlenzero, sizeof(tlenzero))
-#define GETLEN(LEN, FP) BufFileRead(FP, (char *)&(LEN), sizeof(tlenzero))
-#define ALLOCTUP(LEN) ((HeapTuple)palloc((unsigned)LEN))
-#define FREE(x) pfree((char *) x)
-#define GETTUP(NODE, TUP, LEN, FP) \
-( \
- IncrProcessed(), \
- ((Psortstate *)NODE->psortstate)->BytesRead += (LEN) - sizeof(tlenzero), \
- BufFileRead(FP, (char *)(TUP) + sizeof(tlenzero), (LEN) - sizeof(tlenzero)), \
- (TUP)->t_data = (HeapTupleHeader) ((char *)(TUP) + HEAPTUPLESIZE), \
- BufFileRead(FP, (char *)&tlendummy, sizeof(tlendummy)) \
-)
+#define ENDRUN(NODE, TAPE) \
+ LogicalTapeWrite(PS(NODE)->tapeset, (TAPE), (void *)&tlenzero, sizeof(tlenzero))
+
+#define TRYGETLEN(NODE, LEN, TAPE) \
+ (LogicalTapeRead(PS(NODE)->tapeset, (TAPE), \
+ (void *) &(LEN), sizeof(tlenzero)) == sizeof(tlenzero) \
+ && (LEN) != 0)
-#define SETTUPLEN(TUP, LEN) ((TUP)->t_len = (LEN) - HEAPTUPLESIZE)
+#define GETLEN(NODE, LEN, TAPE) \
+ do { \
+ if (! TRYGETLEN(NODE, LEN, TAPE)) \
+ elog(ERROR, "psort: unexpected end of data"); \
+ } while(0)
-#define rewind(FP) BufFileSeek(FP, 0, 0L, SEEK_SET)
+static void GETTUP(Sort *node, HeapTuple tup, unsigned int len, int tape)
+{
+ IncrProcessed();
+ PS(node)->BytesRead += len;
+ if (LogicalTapeRead(PS(node)->tapeset, tape,
+ ((char *) tup) + sizeof(tlenzero),
+ len - sizeof(tlenzero)) != len - sizeof(tlenzero))
+ elog(ERROR, "psort: unexpected end of data");
+ tup->t_len = len - HEAPTUPLESIZE;
+ tup->t_data = (HeapTupleHeader) ((char *) tup + HEAPTUPLESIZE);
+ if (LogicalTapeRead(PS(node)->tapeset, tape,
+ (void *) &tlendummy,
+ sizeof(tlendummy)) != sizeof(tlendummy))
+ elog(ERROR, "psort: unexpected end of data");
+}
+
+#define ALLOCTUP(LEN) ((HeapTuple) palloc(LEN))
+#define FREE(x) pfree((char *) (x))
/*
* USEMEM - record use of memory FREEMEM - record
static void
initialrun(Sort *node)
{
- /* struct tuple *tup; */
struct tape *tp;
int baseruns; /* D:(a) */
int extrapasses; /* EOF */
+ int tapenum;
Assert(node != (Sort *) NULL);
Assert(PS(node) != (Psortstate *) NULL);
extrapasses = 0;
}
else
-/* all tuples fetched */
{
+ /* all tuples fetched */
if (!PS(node)->using_tape_files) /* empty or sorted in
* memory */
return;
*/
if (PS(node)->Tuples == NULL)
{
- PS(node)->psort_grab_file = PS(node)->Tape->tp_file;
- rewind(PS(node)->psort_grab_file);
+ PS(node)->psort_grab_tape = PS(node)->Tape[0].tp_tapenum;
+ /* freeze and rewind the finished output tape */
+ LogicalTapeFreeze(PS(node)->tapeset, PS(node)->psort_grab_tape);
return;
}
extrapasses = 2;
{
if (--extrapasses)
{
- dumptuples(tp->tp_file, node);
- ENDRUN(tp->tp_file);
+ dumptuples(node, tp->tp_tapenum);
+ ENDRUN(node, tp->tp_tapenum);
continue;
}
else
break;
}
- if ((bool) createrun(node, tp->tp_file) == false)
+ if (createrun(node, tp->tp_tapenum) == false)
extrapasses = 1 + (PS(node)->Tuples != NULL);
/* D2 */
}
- for (tp = PS(node)->Tape + PS(node)->TapeRange; tp >= PS(node)->Tape; tp--)
- rewind(tp->tp_file); /* D. */
+ /* End of step D2: rewind all output tapes to prepare for merging */
+ for (tapenum = 0; tapenum < PS(node)->TapeRange; tapenum++)
+ LogicalTapeRewind(PS(node)->tapeset, tapenum, false);
}
/*
Assert(PS(node)->memtuples == NULL);
Assert(PS(node)->tupcount == 0);
if (LACKMEM(node))
- elog(ERROR, "psort: LACKMEM in createfirstrun");
+ elog(ERROR, "psort: LACKMEM before createfirstrun");
memtuples = palloc(t_free * sizeof(HeapTuple));
for (t = t_last - 1; t >= 0; t--)
puttuple(&PS(node)->Tuples, memtuples[t], 0, &PS(node)->treeContext);
pfree(memtuples);
- foundeor = !createrun(node, PS(node)->Tape->tp_file);
+ foundeor = ! createrun(node, PS(node)->Tape->tp_tapenum);
}
else
{
}
/*
- * createrun - places the next run on file, grabbing the tuples by
- * executing the subplan passed in
+ * createrun
+ *
+ * Create the next run and write it to desttapenum, grabbing the tuples by
+ * executing the subplan passed in
*
* Uses:
* Tuples, which should contain any tuples for this run
* Tuples contains the tuples for the following run upon exit
*/
static bool
-createrun(Sort *node, BufFile *file)
+createrun(Sort *node, int desttapenum)
{
HeapTuple lasttuple;
HeapTuple tup;
}
lasttuple = gettuple(&PS(node)->Tuples, &junk,
&PS(node)->treeContext);
- PUTTUP(node, lasttuple, file);
+ PUTTUP(node, lasttuple, desttapenum);
TRACEOUT(createrun, lasttuple);
}
FREE(lasttuple);
TRACEMEM(createrun);
}
- dumptuples(file, node);
- ENDRUN(file); /* delimit the end of the run */
+ dumptuples(node, desttapenum);
+ ENDRUN(node, desttapenum); /* delimit the end of the run */
t_last++;
/* put tuples for the next run into leftist tree */
* (polyphase merge Alg.D(D6)--Knuth, Vol.3, p271)
*
* Returns:
- * file of tuples in order
+ * tape number of finished tape containing all tuples in order
*/
-static BufFile *
+static int
mergeruns(Sort *node)
{
struct tape *tp;
Assert(node != (Sort *) NULL);
Assert(PS(node) != (Psortstate *) NULL);
- Assert(PS(node)->using_tape_files == true);
+ Assert(PS(node)->using_tape_files);
tp = PS(node)->Tape + PS(node)->TapeRange;
merge(node, tp);
- rewind(tp->tp_file);
while (--PS(node)->Level != 0)
{
+ /* rewind output tape to use as new input */
+ LogicalTapeRewind(PS(node)->tapeset, tp->tp_tapenum, false);
tp = tp->tp_prev;
- rewind(tp->tp_file);
+ /* rewind new output tape and prepare it for write pass */
+ LogicalTapeRewind(PS(node)->tapeset, tp->tp_tapenum, true);
merge(node, tp);
- rewind(tp->tp_file);
}
- return tp->tp_file;
+ /* freeze and rewind the final output tape */
+ LogicalTapeFreeze(PS(node)->tapeset, tp->tp_tapenum);
+ return tp->tp_tapenum;
}
/*
struct tape *lasttp; /* (TAPE[P]) */
struct tape *tp;
struct leftist *tuples;
- BufFile *destfile;
+ int desttapenum;
int times; /* runs left to merge */
int outdummy; /* complete dummy runs */
short fromtape;
Assert(node != (Sort *) NULL);
Assert(PS(node) != (Psortstate *) NULL);
- Assert(PS(node)->using_tape_files == true);
+ Assert(PS(node)->using_tape_files);
lasttp = dest->tp_prev;
times = lasttp->tp_fib;
/* do not add the outdummy runs yet */
times -= outdummy;
}
- destfile = dest->tp_file;
+ desttapenum = dest->tp_tapenum;
while (times-- != 0)
{ /* merge one run */
tuples = NULL;
if (PS(node)->TotalDummy == 0)
for (tp = dest->tp_prev; tp != dest; tp = tp->tp_prev)
{
- GETLEN(tuplen, tp->tp_file);
+ GETLEN(node, tuplen, tp->tp_tapenum);
tup = ALLOCTUP(tuplen);
USEMEM(node, tuplen);
TRACEMEM(merge);
- SETTUPLEN(tup, tuplen);
- GETTUP(node, tup, tuplen, tp->tp_file);
+ GETTUP(node, tup, tuplen, tp->tp_tapenum);
puttuple(&tuples, tup, tp - PS(node)->Tape,
&PS(node)->treeContext);
}
}
else
{
- GETLEN(tuplen, tp->tp_file);
+ GETLEN(node, tuplen, tp->tp_tapenum);
tup = ALLOCTUP(tuplen);
USEMEM(node, tuplen);
TRACEMEM(merge);
- SETTUPLEN(tup, tuplen);
- GETTUP(node, tup, tuplen, tp->tp_file);
+ GETTUP(node, tup, tuplen, tp->tp_tapenum);
puttuple(&tuples, tup, tp - PS(node)->Tape,
&PS(node)->treeContext);
}
{
/* possible optimization by using count in tuples */
tup = gettuple(&tuples, &fromtape, &PS(node)->treeContext);
- PUTTUP(node, tup, destfile);
+ PUTTUP(node, tup, desttapenum);
FREEMEM(node, tup->t_len);
FREE(tup);
TRACEMEM(merge);
- GETLEN(tuplen, PS(node)->Tape[fromtape].tp_file);
- if (tuplen == 0)
- ;
- else
+ if (TRYGETLEN(node, tuplen, PS(node)->Tape[fromtape].tp_tapenum))
{
tup = ALLOCTUP(tuplen);
USEMEM(node, tuplen);
TRACEMEM(merge);
- SETTUPLEN(tup, tuplen);
- GETTUP(node, tup, tuplen, PS(node)->Tape[fromtape].tp_file);
+ GETTUP(node, tup, tuplen, PS(node)->Tape[fromtape].tp_tapenum);
puttuple(&tuples, tup, fromtape, &PS(node)->treeContext);
}
}
- ENDRUN(destfile);
+ ENDRUN(node, desttapenum);
}
PS(node)->TotalDummy += outdummy;
}
/*
- * dumptuples - stores all the tuples in tree into file
+ * dumptuples - stores all the tuples remaining in tree to dest tape
*/
static void
-dumptuples(BufFile *file, Sort *node)
+dumptuples(Sort *node, int desttapenum)
{
+ LeftistContext context = &PS(node)->treeContext;
+ struct leftist **treep = &PS(node)->Tuples;
struct leftist *tp;
struct leftist *newp;
- struct leftist **treep = &PS(node)->Tuples;
- LeftistContext context = &PS(node)->treeContext;
HeapTuple tup;
Assert(PS(node)->using_tape_files);
else
newp = lmerge(tp->lt_left, tp->lt_right, context);
pfree(tp);
- PUTTUP(node, tup, file);
+ PUTTUP(node, tup, desttapenum);
FREEMEM(node, tup->t_len);
FREE(tup);
{
if (PS(node)->all_fetched)
return NULL;
- if (GETLEN(tuplen, PS(node)->psort_grab_file) && tuplen != 0)
+ if (TRYGETLEN(node, tuplen, PS(node)->psort_grab_tape))
{
tup = ALLOCTUP(tuplen);
- SETTUPLEN(tup, tuplen);
- GETTUP(node, tup, tuplen, PS(node)->psort_grab_file);
+ GETTUP(node, tup, tuplen, PS(node)->psort_grab_tape);
return tup;
}
else
* length word. If seek fails we must have a completely empty
* file.
*/
- if (BufFileSeek(PS(node)->psort_grab_file, 0,
- - (long) (2 * sizeof(tlendummy)), SEEK_CUR))
+ if (! LogicalTapeBackspace(PS(node)->tapeset,
+ PS(node)->psort_grab_tape,
+ 2 * sizeof(tlendummy)))
return NULL;
- GETLEN(tuplen, PS(node)->psort_grab_file);
+ GETLEN(node, tuplen, PS(node)->psort_grab_tape);
PS(node)->all_fetched = false;
}
else
* Back up and fetch prev tuple's ending length word.
* If seek fails, assume we are at start of file.
*/
- if (BufFileSeek(PS(node)->psort_grab_file, 0,
- - (long) sizeof(tlendummy), SEEK_CUR))
+ if (! LogicalTapeBackspace(PS(node)->tapeset,
+ PS(node)->psort_grab_tape,
+ sizeof(tlendummy)))
return NULL;
- GETLEN(tuplen, PS(node)->psort_grab_file);
- if (tuplen == 0)
- elog(ERROR, "psort_grabtuple: tuplen is 0 in backward scan");
+ GETLEN(node, tuplen, PS(node)->psort_grab_tape);
/*
* Back up to get ending length word of tuple before it.
*/
- if (BufFileSeek(PS(node)->psort_grab_file, 0,
- - (long) (tuplen + 2*sizeof(tlendummy)), SEEK_CUR))
+ if (! LogicalTapeBackspace(PS(node)->tapeset,
+ PS(node)->psort_grab_tape,
+ tuplen + 2*sizeof(tlendummy)))
{
/* If fail, presumably the prev tuple is the first in the file.
* Back up so that it becomes next to read in forward direction
* (not obviously right, but that is what in-memory case does)
*/
- if (BufFileSeek(PS(node)->psort_grab_file, 0,
- - (long) (tuplen + sizeof(tlendummy)), SEEK_CUR))
+ if (! LogicalTapeBackspace(PS(node)->tapeset,
+ PS(node)->psort_grab_tape,
+ tuplen + sizeof(tlendummy)))
elog(ERROR, "psort_grabtuple: too big last tuple len in backward scan");
return NULL;
}
- GETLEN(tuplen, PS(node)->psort_grab_file);
+ GETLEN(node, tuplen, PS(node)->psort_grab_tape);
}
/*
* Note: GETTUP expects we are positioned after the initial length
* word of the tuple, so back up to that point.
*/
- if (BufFileSeek(PS(node)->psort_grab_file, 0,
- - (long) tuplen, SEEK_CUR))
+ if (! LogicalTapeBackspace(PS(node)->tapeset,
+ PS(node)->psort_grab_tape,
+ tuplen))
elog(ERROR, "psort_grabtuple: too big tuple len in backward scan");
tup = ALLOCTUP(tuplen);
- SETTUPLEN(tup, tuplen);
- GETTUP(node, tup, tuplen, PS(node)->psort_grab_file);
+ GETTUP(node, tup, tuplen, PS(node)->psort_grab_tape);
return tup;
}
else
Assert(PS(node) != (Psortstate *) NULL);
if (PS(node)->using_tape_files == true)
- BufFileTell(PS(node)->psort_grab_file,
- & PS(node)->psort_saved_fileno,
- & PS(node)->psort_saved);
+ LogicalTapeTell(PS(node)->tapeset,
+ PS(node)->psort_grab_tape,
+ & PS(node)->psort_saved,
+ & PS(node)->psort_saved_offset);
else
PS(node)->psort_saved = PS(node)->psort_current;
}
Assert(PS(node) != (Psortstate *) NULL);
if (PS(node)->using_tape_files == true)
- BufFileSeek(PS(node)->psort_grab_file,
- PS(node)->psort_saved_fileno,
- PS(node)->psort_saved,
- SEEK_SET);
+ {
+ if (! LogicalTapeSeek(PS(node)->tapeset,
+ PS(node)->psort_grab_tape,
+ PS(node)->psort_saved,
+ PS(node)->psort_saved_offset))
+ elog(ERROR, "psort_restorepos failed");
+ }
else
PS(node)->psort_current = PS(node)->psort_saved;
}
/*
- * psort_end - unlinks the tape files, and cleans up. Should not be
- * called unless psort_grabtuple has returned a NULL.
+ * psort_end
+ *
+ * Release resources and clean up.
*/
void
psort_end(Sort *node)
{
- struct tape *tp;
-
- if (!node->cleaned)
+ /* node->cleaned is probably redundant? */
+ if (!node->cleaned && PS(node) != (Psortstate *) NULL)
{
+ if (PS(node)->tapeset)
+ LogicalTapeSetClose(PS(node)->tapeset);
+ if (PS(node)->memtuples)
+ pfree(PS(node)->memtuples);
- /*
- * I'm changing this because if we are sorting a relation with no
- * tuples, psortstate is NULL.
- */
- if (PS(node) != (Psortstate *) NULL)
- {
- if (PS(node)->using_tape_files == true)
- for (tp = PS(node)->Tape + PS(node)->TapeRange; tp >= PS(node)->Tape; tp--)
- destroytape(tp->tp_file);
- else if (PS(node)->memtuples)
- pfree(PS(node)->memtuples);
-
- NDirectFileRead += (int) ceil((double) PS(node)->BytesRead / BLCKSZ);
- NDirectFileWrite += (int) ceil((double) PS(node)->BytesWritten / BLCKSZ);
+ /* XXX what about freeing leftist tree and tuples in memory? */
- pfree((void *) node->psortstate);
- node->psortstate = NULL;
+ NDirectFileRead += (int) ceil((double) PS(node)->BytesRead / BLCKSZ);
+ NDirectFileWrite += (int) ceil((double) PS(node)->BytesWritten / BLCKSZ);
- node->cleaned = TRUE;
- }
+ pfree((void *) node->psortstate);
+ node->psortstate = NULL;
+ node->cleaned = TRUE;
}
}
if (((Plan *) node)->lefttree->chgParam != NULL)
{
psort_end(node);
- node->cleaned = false;
+ node->cleaned = false; /* huh? */
}
else if (PS(node) != (Psortstate *) NULL)
{
PS(node)->all_fetched = false;
PS(node)->psort_current = 0;
- PS(node)->psort_saved_fileno = 0;
PS(node)->psort_saved = 0L;
+ PS(node)->psort_saved_offset = 0;
if (PS(node)->using_tape_files == true)
- rewind(PS(node)->psort_grab_file);
+ LogicalTapeRewind(PS(node)->tapeset,
+ PS(node)->psort_grab_tape,
+ false);
}
}
-/*
- * gettape - returns an open stream for writing/reading
- *
- * Returns:
- * Open stream for writing/reading.
- * NULL if unable to open temporary file.
- *
- * There used to be a lot of cruft here to try to ensure that we destroyed
- * all the tape files; but it didn't really work. Now we rely on fd.c to
- * clean up temp files if an error occurs.
- */
-static BufFile *
-gettape()
-{
- return BufFileCreateTemp();
-}
-
-/*
- * destroytape - unlinks the tape
- */
-static void
-destroytape(BufFile *file)
-{
- BufFileClose(file);
-}
-
static int
_psort_cmp(HeapTuple *ltup, HeapTuple *rtup)
{
*
* Copyright (c) 1994, Regents of the University of California
*
- * $Id: buffile.h,v 1.1 1999/10/13 15:02:32 tgl Exp $
+ * $Id: buffile.h,v 1.2 1999/10/16 19:49:27 tgl Exp $
*
*-------------------------------------------------------------------------
*/
extern BufFile *BufFileCreateTemp(void);
extern BufFile *BufFileCreate(File file);
-extern BufFile *BufFileReaccess(BufFile *file);
extern void BufFileClose(BufFile *file);
extern size_t BufFileRead(BufFile *file, void *ptr, size_t size);
extern size_t BufFileWrite(BufFile *file, void *ptr, size_t size);
extern int BufFileSeek(BufFile *file, int fileno, long offset, int whence);
extern void BufFileTell(BufFile *file, int *fileno, long *offset);
+extern int BufFileSeekBlock(BufFile *file, long blknum);
+extern long BufFileTellBlock(BufFile *file);
#endif /* BUFFILE_H */
--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * logtape.h
+ * Management of "logical tapes" within temporary files.
+ *
+ * See logtape.c for explanations.
+ *
+ * Copyright (c) 1994, Regents of the University of California
+ *
+ * $Id: logtape.h,v 1.1 1999/10/16 19:49:28 tgl Exp $
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#ifndef LOGTAPE_H
+#define LOGTAPE_H
+
+/* LogicalTapeSet is an opaque type whose details are not known outside logtape.c. */
+
+typedef struct LogicalTapeSet LogicalTapeSet;
+
+/*
+ * prototypes for functions in logtape.c
+ */
+
+extern LogicalTapeSet *LogicalTapeSetCreate(int ntapes);
+extern void LogicalTapeSetClose(LogicalTapeSet *lts);
+extern size_t LogicalTapeRead(LogicalTapeSet *lts, int tapenum,
+ void *ptr, size_t size);
+extern void LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
+ void *ptr, size_t size);
+extern void LogicalTapeRewind(LogicalTapeSet *lts, int tapenum, bool forWrite);
+extern void LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum);
+extern bool LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum,
+ size_t size);
+extern bool LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
+ long blocknum, int offset);
+extern void LogicalTapeTell(LogicalTapeSet *lts, int tapenum,
+ long *blocknum, int *offset);
+
+#endif /* LOGTAPE_H */
/*-------------------------------------------------------------------------
*
* psort.h
- *
- *
+ * Polyphase merge sort.
*
* Copyright (c) 1994, Regents of the University of California
*
- * $Id: psort.h,v 1.22 1999/10/13 15:02:28 tgl Exp $
+ * $Id: psort.h,v 1.23 1999/10/16 19:49:28 tgl Exp $
*
*-------------------------------------------------------------------------
*/
#ifndef PSORT_H
#define PSORT_H
-#include "access/relscan.h"
+#include "access/htup.h"
+#include "access/skey.h"
#include "nodes/plannodes.h"
-#include "storage/buffile.h"
-#include "utils/lselect.h"
-
-#define MAXTAPES 7 /* See Knuth Fig. 70, p273 */
-
-struct tape
-{
- int tp_dummy; /* (D) */
- int tp_fib; /* (A) */
- BufFile *tp_file; /* (TAPE) */
- struct tape *tp_prev;
-};
-
-struct cmplist
-{
- int cp_attn; /* attribute number */
- int cp_num; /* comparison function code */
- int cp_rev; /* invert comparison flag */
- struct cmplist *cp_next; /* next in chain */
-};
-
-/* This structure preserves the state of psort between calls from different
- * nodes to its interface functions. Basically, it includes all of the global
- * variables in psort. In case you were wondering, pointers to these structures
- * are included in Sort node structures. -Rex 2.6.1995
- */
-typedef struct Psortstate
-{
- LeftistContextData treeContext;
-
- int TapeRange;
- int Level;
- int TotalDummy;
- struct tape Tape[MAXTAPES];
-
- int BytesRead;
- int BytesWritten;
- int tupcount;
-
- struct leftist *Tuples;
-
- BufFile *psort_grab_file;
- long psort_current; /* array index (only used if not tape) */
- int psort_saved_fileno; /* upper bits of psort_saved, if tape */
- long psort_saved; /* could be file offset, or array index */
- bool using_tape_files;
- bool all_fetched; /* this is for cursors */
-
- HeapTuple *memtuples;
-} Psortstate;
-
-#ifdef EBUG
-#include "storage/buf.h"
-#include "storage/bufmgr.h"
-
-#define PDEBUG(PROC, S1)\
-elog(DEBUG, "%s:%d>> PROC: %s.", __FILE__, __LINE__, S1)
-
-#define PDEBUG2(PROC, S1, D1)\
-elog(DEBUG, "%s:%d>> PROC: %s %d.", __FILE__, __LINE__, S1, D1)
-
-#define PDEBUG4(PROC, S1, D1, S2, D2)\
-elog(DEBUG, "%s:%d>> PROC: %s %d, %s %d.", __FILE__, __LINE__, S1, D1, S2, D2)
-
-#define VDEBUG(VAR, FMT)\
-elog(DEBUG, "%s:%d>> VAR =FMT", __FILE__, __LINE__, VAR)
-
-#define ASSERT(EXPR, STR)\
-if (!(EXPR)) elog(FATAL, "%s:%d>> %s", __FILE__, __LINE__, STR)
-
-#define TRACE(VAL, CODE)\
-if (1) CODE; else
-
-#else
-#define PDEBUG(MSG)
-#define VDEBUG(VAR, FMT)
-#define ASSERT(EXPR, MSG)
-#define TRACE(VAL, CODE)
-#endif
-/* psort.c */
extern bool psort_begin(Sort *node, int nkeys, ScanKey key);
extern HeapTuple psort_grabtuple(Sort *node, bool *should_free);
extern void psort_markpos(Sort *node);