Second phase of psort reconstruction project: add bookkeeping logic to

recycle storage within sort temp file on a block-by-block basis.  This
reduces peak disk usage to essentially just the volume of data being
sorted, whereas it had been about 4x the data volume before.
This commit is contained in:
Tom Lane 1999-10-16 19:49:28 +00:00
parent 357231e68e
commit 957146dcec
7 changed files with 1294 additions and 453 deletions

View File

@ -6,7 +6,7 @@
* Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/storage/file/buffile.c,v 1.1 1999/10/13 15:02:29 tgl Exp $
* $Header: /cvsroot/pgsql/src/backend/storage/file/buffile.c,v 1.2 1999/10/16 19:49:26 tgl Exp $
*
* NOTES:
*
@ -27,10 +27,7 @@
*
* BufFile also supports temporary files that exceed the OS file size limit
* (by opening multiple fd.c temporary files). This is an essential feature
* for sorts and hashjoins on large amounts of data. It is possible to have
* more than one BufFile reading/writing the same temp file, although the
* caller is responsible for avoiding ill effects from buffer overlap when
* this is done.
* for sorts and hashjoins on large amounts of data.
*-------------------------------------------------------------------------
*/
@ -48,33 +45,24 @@
#define MAX_PHYSICAL_FILESIZE (RELSEG_SIZE * BLCKSZ)
/*
* To handle multiple BufFiles on a single logical temp file, we use this
* data structure representing a logical file (which can be made up of
* multiple physical files to get around the OS file size limit).
* This data structure represents a buffered file that consists of one or
* more physical files (each accessed through a virtual file descriptor
* managed by fd.c).
*/
typedef struct LogicalFile
struct BufFile
{
int refCount; /* number of BufFiles using me */
bool isTemp; /* can only add files if this is TRUE */
int numFiles; /* number of physical files in set */
/* all files except the last have length exactly MAX_PHYSICAL_FILESIZE */
File *files; /* palloc'd array with numFiles entries */
long *offsets; /* palloc'd array with numFiles entries */
/* offsets[i] is the current seek position of files[i]. We use this
* to avoid making redundant FileSeek calls.
*/
} LogicalFile;
/*
* A single file buffer looks like this.
*/
struct BufFile
{
LogicalFile *logFile; /* the underlying LogicalFile */
bool isTemp; /* can only add files if this is TRUE */
bool dirty; /* does buffer need to be written? */
/*
* "current pos" is position of start of buffer within LogicalFile.
* "current pos" is position of start of buffer within the logical file.
* Position as seen by user of BufFile is (curFile, curOffset + pos).
*/
int curFile; /* file index (0..n) part of current pos */
@ -84,30 +72,33 @@ struct BufFile
char buffer[BLCKSZ];
};
static LogicalFile *makeLogicalFile(File firstfile);
static void extendLogicalFile(LogicalFile *file);
static void deleteLogicalFile(LogicalFile *file);
static BufFile *makeBufFile(File firstfile);
static void extendBufFile(BufFile *file);
static void BufFileLoadBuffer(BufFile *file);
static void BufFileDumpBuffer(BufFile *file);
static int BufFileFlush(BufFile *file);
/*
* Create a LogicalFile with one component file and refcount 1.
* Create a BufFile given the first underlying physical file.
* NOTE: caller must set isTemp true if appropriate.
*/
static LogicalFile *
makeLogicalFile(File firstfile)
static BufFile *
makeBufFile(File firstfile)
{
LogicalFile *file = (LogicalFile *) palloc(sizeof(LogicalFile));
BufFile *file = (BufFile *) palloc(sizeof(BufFile));
file->refCount = 1;
file->isTemp = false;
file->numFiles = 1;
file->files = (File *) palloc(sizeof(File));
file->files[0] = firstfile;
file->offsets = (long *) palloc(sizeof(long));
file->offsets[0] = 0L;
file->isTemp = false;
file->dirty = false;
file->curFile = 0;
file->curOffset = 0L;
file->pos = 0;
file->nbytes = 0;
return file;
}
@ -116,7 +107,7 @@ makeLogicalFile(File firstfile)
* Add another component temp file.
*/
static void
extendLogicalFile(LogicalFile *file)
extendBufFile(BufFile *file)
{
File pfile;
@ -133,21 +124,6 @@ extendLogicalFile(LogicalFile *file)
file->numFiles++;
}
/*
* Close and delete a LogicalFile when its refCount has gone to zero.
*/
static void
deleteLogicalFile(LogicalFile *file)
{
int i;
for (i = 0; i < file->numFiles; i++)
FileClose(file->files[i]);
pfree(file->files);
pfree(file->offsets);
pfree(file);
}
/*
* Create a BufFile for a new temporary file (which will expand to become
* multiple temporary files if more than MAX_PHYSICAL_FILESIZE bytes are
@ -156,24 +132,16 @@ deleteLogicalFile(LogicalFile *file)
BufFile *
BufFileCreateTemp(void)
{
BufFile *bfile = (BufFile *) palloc(sizeof(BufFile));
BufFile *file;
File pfile;
LogicalFile *lfile;
pfile = OpenTemporaryFile();
Assert(pfile >= 0);
lfile = makeLogicalFile(pfile);
lfile->isTemp = true;
file = makeBufFile(pfile);
file->isTemp = true;
bfile->logFile = lfile;
bfile->dirty = false;
bfile->curFile = 0;
bfile->curOffset = 0L;
bfile->pos = 0;
bfile->nbytes = 0;
return bfile;
return file;
}
/*
@ -186,42 +154,7 @@ BufFileCreateTemp(void)
BufFile *
BufFileCreate(File file)
{
BufFile *bfile = (BufFile *) palloc(sizeof(BufFile));
LogicalFile *lfile;
lfile = makeLogicalFile(file);
bfile->logFile = lfile;
bfile->dirty = false;
bfile->curFile = 0;
bfile->curOffset = 0L;
bfile->pos = 0;
bfile->nbytes = 0;
return bfile;
}
/*
* Create an additional BufFile accessing the same underlying file as an
* existing BufFile. This is useful for having multiple read/write access
* positions in a single temporary file. Note the caller is responsible
* for avoiding trouble due to overlapping buffer positions! (Caller may
* assume that buffer size is BLCKSZ...)
*/
BufFile *
BufFileReaccess(BufFile *file)
{
BufFile *bfile = (BufFile *) palloc(sizeof(BufFile));
bfile->logFile = file->logFile;
bfile->logFile->refCount++;
bfile->dirty = false;
bfile->curFile = 0;
bfile->curOffset = 0L;
bfile->pos = 0;
bfile->nbytes = 0;
return bfile;
return makeBufFile(file);
}
/*
@ -232,16 +165,21 @@ BufFileReaccess(BufFile *file)
void
BufFileClose(BufFile *file)
{
int i;
/* flush any unwritten data */
BufFileFlush(file);
/* close the underlying (with delete if it's a temp file) */
if (--(file->logFile->refCount) <= 0)
deleteLogicalFile(file->logFile);
/* close the underlying file(s) (with delete if it's a temp file) */
for (i = 0; i < file->numFiles; i++)
FileClose(file->files[i]);
/* release the buffer space */
pfree(file->files);
pfree(file->offsets);
pfree(file);
}
/* BufFileLoadBuffer
/*
* BufFileLoadBuffer
*
* Load some data into buffer, if possible, starting from curOffset.
* At call, must have dirty = false, pos and nbytes = 0.
@ -250,7 +188,6 @@ BufFileClose(BufFile *file)
static void
BufFileLoadBuffer(BufFile *file)
{
LogicalFile *lfile = file->logFile;
File thisfile;
/*
@ -261,30 +198,33 @@ BufFileLoadBuffer(BufFile *file)
* MAX_PHYSICAL_FILESIZE.
*/
if (file->curOffset >= MAX_PHYSICAL_FILESIZE &&
file->curFile+1 < lfile->numFiles)
file->curFile+1 < file->numFiles)
{
file->curFile++;
file->curOffset = 0L;
}
thisfile = lfile->files[file->curFile];
/*
* May need to reposition physical file, if more than one BufFile
* is using it.
* May need to reposition physical file.
*/
if (file->curOffset != lfile->offsets[file->curFile])
thisfile = file->files[file->curFile];
if (file->curOffset != file->offsets[file->curFile])
{
if (FileSeek(thisfile, file->curOffset, SEEK_SET) != file->curOffset)
return; /* seek failed, read nothing */
lfile->offsets[file->curFile] = file->curOffset;
file->offsets[file->curFile] = file->curOffset;
}
/*
* Read whatever we can get, up to a full bufferload.
*/
file->nbytes = FileRead(thisfile, file->buffer, sizeof(file->buffer));
if (file->nbytes < 0)
file->nbytes = 0;
lfile->offsets[file->curFile] += file->nbytes;
file->offsets[file->curFile] += file->nbytes;
/* we choose not to advance curOffset here */
}
/* BufFileDumpBuffer
/*
* BufFileDumpBuffer
*
* Dump buffer contents starting at curOffset.
* At call, should have dirty = true, nbytes > 0.
@ -293,7 +233,6 @@ BufFileLoadBuffer(BufFile *file)
static void
BufFileDumpBuffer(BufFile *file)
{
LogicalFile *lfile = file->logFile;
int wpos = 0;
int bytestowrite;
File thisfile;
@ -307,10 +246,10 @@ BufFileDumpBuffer(BufFile *file)
/*
* Advance to next component file if necessary and possible.
*/
if (file->curOffset >= MAX_PHYSICAL_FILESIZE && lfile->isTemp)
if (file->curOffset >= MAX_PHYSICAL_FILESIZE && file->isTemp)
{
while (file->curFile+1 >= lfile->numFiles)
extendLogicalFile(lfile);
while (file->curFile+1 >= file->numFiles)
extendBufFile(file);
file->curFile++;
file->curOffset = 0L;
}
@ -319,28 +258,27 @@ BufFileDumpBuffer(BufFile *file)
* to write as much as asked...
*/
bytestowrite = file->nbytes - wpos;
if (lfile->isTemp)
if (file->isTemp)
{
long availbytes = MAX_PHYSICAL_FILESIZE - file->curOffset;
if ((long) bytestowrite > availbytes)
bytestowrite = (int) availbytes;
}
thisfile = lfile->files[file->curFile];
/*
* May need to reposition physical file, if more than one BufFile
* is using it.
* May need to reposition physical file.
*/
if (file->curOffset != lfile->offsets[file->curFile])
thisfile = file->files[file->curFile];
if (file->curOffset != file->offsets[file->curFile])
{
if (FileSeek(thisfile, file->curOffset, SEEK_SET) != file->curOffset)
return; /* seek failed, give up */
lfile->offsets[file->curFile] = file->curOffset;
file->offsets[file->curFile] = file->curOffset;
}
bytestowrite = FileWrite(thisfile, file->buffer, bytestowrite);
if (bytestowrite <= 0)
return; /* failed to write */
lfile->offsets[file->curFile] += bytestowrite;
file->offsets[file->curFile] += bytestowrite;
file->curOffset += bytestowrite;
wpos += bytestowrite;
}
@ -363,7 +301,8 @@ BufFileDumpBuffer(BufFile *file)
file->nbytes = 0;
}
/* BufFileRead
/*
* BufFileRead
*
* Like fread() except we assume 1-byte element size.
*/
@ -409,7 +348,8 @@ BufFileRead(BufFile *file, void *ptr, size_t size)
return nread;
}
/* BufFileWrite
/*
* BufFileWrite
*
* Like fwrite() except we assume 1-byte element size.
*/
@ -458,7 +398,8 @@ BufFileWrite(BufFile *file, void *ptr, size_t size)
return nwritten;
}
/* BufFileFlush
/*
* BufFileFlush
*
* Like fflush()
*/
@ -475,9 +416,15 @@ BufFileFlush(BufFile *file)
return 0;
}
/* BufFileSeek
/*
* BufFileSeek
*
* Like fseek(). Result is 0 if OK, EOF if not.
* Like fseek(), except that target position needs two values in order to
* work when logical filesize exceeds maximum value representable by long.
* We do not support relative seeks across more than LONG_MAX, however.
*
* Result is 0 if OK, EOF if not. Logical position is not moved if an
* impossible seek is attempted.
*/
int
BufFileSeek(BufFile *file, int fileno, long offset, int whence)
@ -487,7 +434,7 @@ BufFileSeek(BufFile *file, int fileno, long offset, int whence)
switch (whence)
{
case SEEK_SET:
if (fileno < 0 || fileno >= file->logFile->numFiles ||
if (fileno < 0 || fileno >= file->numFiles ||
offset < 0)
return EOF;
newFile = fileno;
@ -516,11 +463,11 @@ BufFileSeek(BufFile *file, int fileno, long offset, int whence)
return EOF;
newOffset += MAX_PHYSICAL_FILESIZE;
}
if (file->logFile->isTemp)
if (file->isTemp)
{
while (newOffset > MAX_PHYSICAL_FILESIZE)
{
if (++newFile >= file->logFile->numFiles)
if (++newFile >= file->numFiles)
return EOF;
newOffset -= MAX_PHYSICAL_FILESIZE;
}
@ -548,9 +495,44 @@ BufFileSeek(BufFile *file, int fileno, long offset, int whence)
return 0;
}
extern void
void
BufFileTell(BufFile *file, int *fileno, long *offset)
{
*fileno = file->curFile;
*offset = file->curOffset + file->pos;
}
/*
* BufFileSeekBlock --- block-oriented seek
*
* Performs absolute seek to the start of the n'th BLCKSZ-sized block of
* the file. Note that users of this interface will fail if their files
* exceed BLCKSZ * LONG_MAX bytes, but that is quite a lot; we don't work
* with tables bigger than that, either...
*
* Result is 0 if OK, EOF if not. Logical position is not moved if an
* impossible seek is attempted.
*/
int
BufFileSeekBlock(BufFile *file, long blknum)
{
return BufFileSeek(file,
(int) (blknum / RELSEG_SIZE),
(blknum % RELSEG_SIZE) * BLCKSZ,
SEEK_SET);
}
/*
* BufFileTellBlock --- block-oriented tell
*
* Any fractional part of a block in the current seek position is ignored.
*/
long
BufFileTellBlock(BufFile *file)
{
long blknum;
blknum = (file->curOffset + file->pos) / BLCKSZ;
blknum += file->curFile * RELSEG_SIZE;
return blknum;
}

View File

@ -4,7 +4,7 @@
# Makefile for utils/sort
#
# IDENTIFICATION
# $Header: /cvsroot/pgsql/src/backend/utils/sort/Makefile,v 1.5 1998/04/06 00:27:37 momjian Exp $
# $Header: /cvsroot/pgsql/src/backend/utils/sort/Makefile,v 1.6 1999/10/16 19:49:27 tgl Exp $
#
#-------------------------------------------------------------------------
@ -13,7 +13,7 @@ include ../../../Makefile.global
CFLAGS += -I../..
OBJS = lselect.o psort.o
OBJS = logtape.o lselect.o psort.o
all: SUBSYS.o

View File

@ -0,0 +1,903 @@
/*-------------------------------------------------------------------------
*
* logtape.c
* Management of "logical tapes" within temporary files.
*
* This module exists to support sorting via multiple merge passes (see
* psort.c). Merging is an ideal algorithm for tape devices, but if we
* implement it on disk by creating a separate file for each "tape",
* there is an annoying problem: the peak space usage is at least twice
* the volume of actual data to be sorted. (This must be so because each
* datum will appear in both the input and output tapes of the final
* merge pass. For seven-tape polyphase merge, which is otherwise a
* pretty good algorithm, peak usage is more like 4x actual data volume.)
*
* We can work around this problem by recognizing that any one tape
* dataset (with the possible exception of the final output) is written
* and read exactly once in a perfectly sequential manner. Therefore,
* a datum once read will not be required again, and we can recycle its
* space for use by the new tape dataset(s) being generated. In this way,
* the total space usage is essentially just the actual data volume, plus
* insignificant bookkeeping and start/stop overhead.
*
* Few OSes allow arbitrary parts of a file to be released back to the OS,
* so we have to implement this space-recycling ourselves within a single
* logical file. logtape.c exists to perform this bookkeeping and provide
* the illusion of N independent tape devices to psort.c. Note that
* logtape.c itself depends on buffile.c to provide a "logical file" of
* larger size than the underlying OS may support.
*
* For simplicity, we allocate and release space in the underlying file
* in BLCKSZ-size blocks. Space allocation boils down to keeping track
* of which blocks in the underlying file belong to which logical tape,
* plus any blocks that are free (recycled and not yet reused). Normally
* there are not very many free blocks, so we just keep those in a list.
* The blocks in each logical tape are remembered using a method borrowed
* from the Unix HFS filesystem: we store data block numbers in an
* "indirect block". If an indirect block fills up, we write it out to
* the underlying file and remember its location in a second-level indirect
* block. In the same way second-level blocks are remembered in third-
* level blocks, and so on if necessary (of course we're talking huge
* amounts of data here). The topmost indirect block of a given logical
* tape is never actually written out to the physical file, but all lower-
* level indirect blocks will be.
*
* The initial write pass is guaranteed to fill the underlying file
* perfectly sequentially, no matter how data is divided into logical tapes.
* Once we begin merge passes, the access pattern becomes considerably
* less predictable --- but the seeking involved should be comparable to
* what would happen if we kept each logical tape in a separate file,
* so there's no serious performance penalty paid to obtain the space
* savings of recycling. We try to localize the write accesses by always
* writing to the lowest-numbered free block when we have a choice; it's
* not clear this helps much, but it can't hurt. (XXX perhaps a LIFO
* policy for free blocks would be better?)
*
* Since all the bookkeeping and buffer memory is allocated with palloc(),
* and the underlying file(s) are made with OpenTemporaryFile, all resources
* for a logical tape set are certain to be cleaned up even if processing
* is aborted by elog(ERROR). To avoid confusion, the caller should take
* care that all calls for a single LogicalTapeSet are made in the same
* palloc context.
*
* Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/utils/sort/logtape.c,v 1.1 1999/10/16 19:49:27 tgl Exp $
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "storage/buffile.h"
#include "utils/logtape.h"
/*
* Block indexes are "long"s, so we can fit this many per indirect block.
* NB: we assume this is an exact fit!
*/
#define BLOCKS_PER_INDIR_BLOCK (BLCKSZ / sizeof(long))
/*
* We use a struct like this for each active indirection level of each
* logical tape. If the indirect block is not the highest level of its
* tape, the "nextup" link points to the next higher level. Only the
* "ptrs" array is written out if we have to dump the indirect block to
* disk. If "ptrs" is not completely full, we store -1L in the first
* unused slot at completion of the write phase for the logical tape.
*/
typedef struct IndirectBlock
{
int nextSlot; /* next pointer slot to write or read */
struct IndirectBlock *nextup; /* parent indirect level, or NULL if top */
long ptrs[BLOCKS_PER_INDIR_BLOCK]; /* indexes of contained blocks */
} IndirectBlock;
/*
* This data structure represents a single "logical tape" within the set
* of logical tapes stored in the same file. We must keep track of the
* current partially-read-or-written data block as well as the active
* indirect block level(s).
*/
typedef struct LogicalTape
{
IndirectBlock *indirect; /* bottom of my indirect-block hierarchy */
bool writing; /* T while in write phase */
bool frozen; /* T if blocks should not be freed when read */
bool dirty; /* does buffer need to be written? */
/*
* The total data volume in the logical tape is numFullBlocks * BLCKSZ
* + lastBlockBytes. BUT: we do not update lastBlockBytes during writing,
* only at completion of a write phase.
*/
long numFullBlocks; /* number of complete blocks in log tape */
int lastBlockBytes; /* valid bytes in last (incomplete) block */
/*
* Buffer for current data block. Note we don't bother to store the
* actual file block number of the data block (during the write phase
* it hasn't been assigned yet, and during read we don't care anymore).
* But we do need the relative block number so we can detect end-of-tape
* while reading.
*/
long curBlockNumber; /* this block's logical blk# within tape */
int pos; /* next read/write position in buffer */
int nbytes; /* total # of valid bytes in buffer */
char buffer[BLCKSZ];
} LogicalTape;
/*
* This data structure represents a set of related "logical tapes" sharing
* space in a single underlying file. (But that "file" may be multiple files
* if needed to escape OS limits on file size; buffile.c handles that for us.)
* The number of tapes is fixed at creation.
*/
struct LogicalTapeSet
{
BufFile *pfile; /* underlying file for whole tape set */
long nFileBlocks; /* # of blocks used in underlying file */
/*
* We store the numbers of recycled-and-available blocks in freeBlocks[].
* When there are no such blocks, we extend the underlying file. Note
* that the block numbers in freeBlocks are always in *decreasing* order,
* so that removing the last entry gives us the lowest free block.
*/
long *freeBlocks; /* resizable array */
int nFreeBlocks; /* # of currently free blocks */
int freeBlocksLen; /* current allocated length of freeBlocks[] */
/*
* tapes[] is declared size 1 since C wants a fixed size, but actually
* it is of length nTapes.
*/
int nTapes; /* # of logical tapes in set */
LogicalTape *tapes[1]; /* must be last in struct! */
};
static void ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
static void ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer);
static long ltsGetFreeBlock(LogicalTapeSet *lts);
static void ltsReleaseBlock(LogicalTapeSet *lts, long blocknum);
static void ltsRecordBlockNum(LogicalTapeSet *lts, IndirectBlock *indirect,
long blocknum);
static long ltsRewindIndirectBlock(LogicalTapeSet *lts,
IndirectBlock *indirect,
bool freezing);
static long ltsRewindFrozenIndirectBlock(LogicalTapeSet *lts,
IndirectBlock *indirect);
static long ltsRecallNextBlockNum(LogicalTapeSet *lts,
IndirectBlock *indirect,
bool frozen);
static long ltsRecallPrevBlockNum(LogicalTapeSet *lts,
IndirectBlock *indirect);
static void ltsDumpBuffer(LogicalTapeSet *lts, LogicalTape *lt);
/*
* Write a block-sized buffer to the specified block of the underlying file.
*
* NB: should not attempt to write beyond current end of file (ie, create
* "holes" in file), since BufFile doesn't allow that. The first write pass
* must write blocks sequentially.
*
* No need for an error return convention; we elog() on any error.
*/
static void
ltsWriteBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
{
if (BufFileSeekBlock(lts->pfile, blocknum) != 0 ||
BufFileWrite(lts->pfile, buffer, BLCKSZ) != BLCKSZ)
elog(ERROR, "ltsWriteBlock: failed to write block %ld of temporary file\n\t\tPerhaps out of disk space?",
blocknum);
}
/*
* Read a block-sized buffer from the specified block of the underlying file.
*
* No need for an error return convention; we elog() on any error. This
* module should never attempt to read a block it doesn't know is there.
*/
static void
ltsReadBlock(LogicalTapeSet *lts, long blocknum, void *buffer)
{
if (BufFileSeekBlock(lts->pfile, blocknum) != 0 ||
BufFileRead(lts->pfile, buffer, BLCKSZ) != BLCKSZ)
elog(ERROR, "ltsReadBlock: failed to read block %ld of temporary file",
blocknum);
}
/*
* Select a currently unused block for writing to.
*
* NB: should only be called when writer is ready to write immediately,
* to ensure that first write pass is sequential.
*/
static long
ltsGetFreeBlock(LogicalTapeSet *lts)
{
/* If there are multiple free blocks, we select the one appearing last
* in freeBlocks[]. If there are none, assign the next block at the end
* of the file.
*/
if (lts->nFreeBlocks > 0)
return lts->freeBlocks[--lts->nFreeBlocks];
else
return lts->nFileBlocks++;
}
/*
* Return a block# to the freelist.
*/
static void
ltsReleaseBlock(LogicalTapeSet *lts, long blocknum)
{
int ndx;
long *ptr;
/*
* Enlarge freeBlocks array if full.
*/
if (lts->nFreeBlocks >= lts->freeBlocksLen)
{
lts->freeBlocksLen *= 2;
lts->freeBlocks = (long *) repalloc(lts->freeBlocks,
lts->freeBlocksLen * sizeof(long));
}
/*
* Insert blocknum into array, preserving decreasing order (so that
* ltsGetFreeBlock returns the lowest available block number).
* This could get fairly slow if there were many free blocks, but
* we don't expect there to be very many at one time.
*/
ndx = lts->nFreeBlocks++;
ptr = lts->freeBlocks + ndx;
while (ndx > 0 && ptr[-1] < blocknum)
{
ptr[0] = ptr[-1];
ndx--, ptr--;
}
ptr[0] = blocknum;
}
/*
* These routines manipulate indirect-block hierarchies. All are recursive
* so that they don't have any specific limit on the depth of hierarchy.
*/
/*
* Record a data block number in a logical tape's lowest indirect block,
* or record an indirect block's number in the next higher indirect level.
*/
static void
ltsRecordBlockNum(LogicalTapeSet *lts, IndirectBlock *indirect,
long blocknum)
{
if (indirect->nextSlot >= BLOCKS_PER_INDIR_BLOCK)
{
/*
* This indirect block is full, so dump it out and recursively
* save its address in the next indirection level. Create a
* new indirection level if there wasn't one before.
*/
long indirblock = ltsGetFreeBlock(lts);
ltsWriteBlock(lts, indirblock, (void *) indirect->ptrs);
if (indirect->nextup == NULL)
{
indirect->nextup = (IndirectBlock *) palloc(sizeof(IndirectBlock));
indirect->nextup->nextSlot = 0;
indirect->nextup->nextup = NULL;
}
ltsRecordBlockNum(lts, indirect->nextup, indirblock);
/*
* Reset to fill another indirect block at this level.
*/
indirect->nextSlot = 0;
}
indirect->ptrs[indirect->nextSlot++] = blocknum;
}
/*
* Reset a logical tape's indirect-block hierarchy after a write pass
* to prepare for reading. We dump out partly-filled blocks except
* at the top of the hierarchy, and we rewind each level to the start.
* This call returns the first data block number, or -1L if the tape
* is empty.
*
* Unless 'freezing' is true, release indirect blocks to the free pool after
* reading them.
*/
static long
ltsRewindIndirectBlock(LogicalTapeSet *lts,
IndirectBlock *indirect,
bool freezing)
{
/* Insert sentinel if block is not full */
if (indirect->nextSlot < BLOCKS_PER_INDIR_BLOCK)
indirect->ptrs[indirect->nextSlot] = -1L;
/*
* If block is not topmost, write it out, and recurse to obtain
* address of first block in this hierarchy level. Read that one in.
*/
if (indirect->nextup != NULL)
{
long indirblock = ltsGetFreeBlock(lts);
ltsWriteBlock(lts, indirblock, (void *) indirect->ptrs);
ltsRecordBlockNum(lts, indirect->nextup, indirblock);
indirblock = ltsRewindIndirectBlock(lts, indirect->nextup, freezing);
Assert(indirblock != -1L);
ltsReadBlock(lts, indirblock, (void *) indirect->ptrs);
if (! freezing)
ltsReleaseBlock(lts, indirblock);
}
/*
* Reset my next-block pointer, and then fetch a block number if any.
*/
indirect->nextSlot = 0;
if (indirect->ptrs[0] == -1L)
return -1L;
return indirect->ptrs[indirect->nextSlot++];
}
/*
* Rewind a previously-frozen indirect-block hierarchy for another read pass.
* This call returns the first data block number, or -1L if the tape
* is empty.
*/
static long
ltsRewindFrozenIndirectBlock(LogicalTapeSet *lts,
IndirectBlock *indirect)
{
/*
* If block is not topmost, recurse to obtain
* address of first block in this hierarchy level. Read that one in.
*/
if (indirect->nextup != NULL)
{
long indirblock;
indirblock = ltsRewindFrozenIndirectBlock(lts, indirect->nextup);
Assert(indirblock != -1L);
ltsReadBlock(lts, indirblock, (void *) indirect->ptrs);
}
/*
* Reset my next-block pointer, and then fetch a block number if any.
*/
indirect->nextSlot = 0;
if (indirect->ptrs[0] == -1L)
return -1L;
return indirect->ptrs[indirect->nextSlot++];
}
/*
* Obtain next data block number in the forward direction, or -1L if no more.
*
* Unless 'frozen' is true, release indirect blocks to the free pool after
* reading them.
*/
static long
ltsRecallNextBlockNum(LogicalTapeSet *lts,
IndirectBlock *indirect,
bool frozen)
{
if (indirect->nextSlot >= BLOCKS_PER_INDIR_BLOCK ||
indirect->ptrs[indirect->nextSlot] == -1L)
{
long indirblock;
if (indirect->nextup == NULL)
return -1L; /* nothing left at this level */
indirblock = ltsRecallNextBlockNum(lts, indirect->nextup, frozen);
if (indirblock == -1L)
return -1L; /* nothing left at this level */
ltsReadBlock(lts, indirblock, (void *) indirect->ptrs);
if (! frozen)
ltsReleaseBlock(lts, indirblock);
indirect->nextSlot = 0;
}
if (indirect->ptrs[indirect->nextSlot] == -1L)
return -1L;
return indirect->ptrs[indirect->nextSlot++];
}
/*
* Obtain next data block number in the reverse direction, or -1L if no more.
*
* Note this fetches the block# before the one last returned, no matter which
* direction of call returned that one. If we fail, no change in state.
*
* This routine can only be used in 'frozen' state, so there's no need to
* pass a parameter telling whether to release blocks ... we never do.
*/
static long
ltsRecallPrevBlockNum(LogicalTapeSet *lts,
IndirectBlock *indirect)
{
if (indirect->nextSlot <= 1)
{
long indirblock;
if (indirect->nextup == NULL)
return -1L; /* nothing left at this level */
indirblock = ltsRecallPrevBlockNum(lts, indirect->nextup);
if (indirblock == -1L)
return -1L; /* nothing left at this level */
ltsReadBlock(lts, indirblock, (void *) indirect->ptrs);
/* The previous block would only have been written out if full,
* so we need not search it for a -1 sentinel.
*/
indirect->nextSlot = BLOCKS_PER_INDIR_BLOCK+1;
}
indirect->nextSlot--;
return indirect->ptrs[indirect->nextSlot-1];
}
/*
* Create a set of logical tapes in a temporary underlying file.
*
* Each tape is initialized in write state.
*/
LogicalTapeSet *
LogicalTapeSetCreate(int ntapes)
{
LogicalTapeSet *lts;
LogicalTape *lt;
int i;
/*
* Create top-level struct. First LogicalTape pointer is already
* counted in sizeof(LogicalTapeSet).
*/
Assert(ntapes > 0);
lts = (LogicalTapeSet *) palloc(sizeof(LogicalTapeSet) +
(ntapes-1) * sizeof(LogicalTape *));
lts->pfile = BufFileCreateTemp();
lts->nFileBlocks = 0L;
lts->freeBlocksLen = 32; /* reasonable initial guess */
lts->freeBlocks = (long *) palloc(lts->freeBlocksLen * sizeof(long));
lts->nFreeBlocks = 0;
lts->nTapes = ntapes;
/*
* Create per-tape structs, including first-level indirect blocks.
*/
for (i = 0; i < ntapes; i++)
{
lt = (LogicalTape *) palloc(sizeof(LogicalTape));
lts->tapes[i] = lt;
lt->indirect = (IndirectBlock *) palloc(sizeof(IndirectBlock));
lt->indirect->nextSlot = 0;
lt->indirect->nextup = NULL;
lt->writing = true;
lt->frozen = false;
lt->dirty = false;
lt->numFullBlocks = 0L;
lt->lastBlockBytes = 0;
lt->curBlockNumber = 0L;
lt->pos = 0;
lt->nbytes = 0;
}
return lts;
}
/*
* Close a logical tape set and release all resources.
*/
void LogicalTapeSetClose(LogicalTapeSet *lts)
{
LogicalTape *lt;
IndirectBlock *ib,
*nextib;
int i;
BufFileClose(lts->pfile);
for (i = 0; i < lts->nTapes; i++)
{
lt = lts->tapes[i];
for (ib = lt->indirect; ib != NULL; ib = nextib)
{
nextib = ib->nextup;
pfree(ib);
}
pfree(lt);
}
pfree(lts->freeBlocks);
pfree(lts);
}
/*
* Dump the dirty buffer of a logical tape.
*/
static void
ltsDumpBuffer(LogicalTapeSet *lts, LogicalTape *lt)
{
long datablock = ltsGetFreeBlock(lts);
Assert(lt->dirty);
ltsWriteBlock(lts, datablock, (void *) lt->buffer);
ltsRecordBlockNum(lts, lt->indirect, datablock);
lt->dirty = false;
/* Caller must do other state update as needed */
}
/*
* Write to a logical tape.
*
* There are no error returns; we elog() on failure.
*/
void
LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
void *ptr, size_t size)
{
LogicalTape *lt;
size_t nthistime;
Assert(tapenum >= 0 && tapenum < lts->nTapes);
lt = lts->tapes[tapenum];
Assert(lt->writing);
while (size > 0)
{
if (lt->pos >= BLCKSZ)
{
/* Buffer full, dump it out */
if (lt->dirty)
{
ltsDumpBuffer(lts, lt);
}
else
{
/* Hmm, went directly from reading to writing? */
elog(ERROR, "LogicalTapeWrite: impossible state");
}
lt->numFullBlocks++;
lt->curBlockNumber++;
lt->pos = 0;
lt->nbytes = 0;
}
nthistime = BLCKSZ - lt->pos;
if (nthistime > size)
nthistime = size;
Assert(nthistime > 0);
memcpy(lt->buffer + lt->pos, ptr, nthistime);
lt->dirty = true;
lt->pos += nthistime;
if (lt->nbytes < lt->pos)
lt->nbytes = lt->pos;
ptr = (void *) ((char *) ptr + nthistime);
size -= nthistime;
}
}
/*
* Rewind logical tape and switch from writing to reading or vice versa.
*
* Unless the tape has been "frozen" in read state, forWrite must be the
* opposite of the previous tape state.
*/
void
LogicalTapeRewind(LogicalTapeSet *lts, int tapenum, bool forWrite)
{
LogicalTape *lt;
long datablocknum;
Assert(tapenum >= 0 && tapenum < lts->nTapes);
lt = lts->tapes[tapenum];
if (! forWrite)
{
if (lt->writing)
{
/*
* Completion of a write phase. Flush last partial data
* block, flush any partial indirect blocks, rewind for
* normal (destructive) read.
*/
if (lt->dirty)
ltsDumpBuffer(lts, lt);
lt->lastBlockBytes = lt->nbytes;
lt->writing = false;
datablocknum = ltsRewindIndirectBlock(lts, lt->indirect, false);
}
else
{
/*
* This is only OK if tape is frozen; we rewind for (another)
* read pass.
*/
Assert(lt->frozen);
datablocknum = ltsRewindFrozenIndirectBlock(lts, lt->indirect);
}
/* Read the first block, or reset if tape is empty */
lt->curBlockNumber = 0L;
lt->pos = 0;
lt->nbytes = 0;
if (datablocknum != -1L)
{
ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
if (! lt->frozen)
ltsReleaseBlock(lts, datablocknum);
lt->nbytes = (lt->curBlockNumber < lt->numFullBlocks) ?
BLCKSZ : lt->lastBlockBytes;
}
}
else
{
/*
* Completion of a read phase. Rewind and prepare for write.
*
* NOTE: we assume the caller has read the tape to the end;
* otherwise untouched data and indirect blocks will not have
* been freed. We could add more code to free any unread blocks,
* but in current usage of this module it'd be useless code.
*/
IndirectBlock *ib,
*nextib;
Assert(! lt->writing && ! lt->frozen);
/* Must truncate the indirect-block hierarchy down to one level. */
for (ib = lt->indirect->nextup; ib != NULL; ib = nextib)
{
nextib = ib->nextup;
pfree(ib);
}
lt->indirect->nextSlot = 0;
lt->indirect->nextup = NULL;
lt->writing = true;
lt->dirty = false;
lt->numFullBlocks = 0L;
lt->lastBlockBytes = 0;
lt->curBlockNumber = 0L;
lt->pos = 0;
lt->nbytes = 0;
}
}
/*
* Read from a logical tape.
*
* Early EOF is indicated by return value less than #bytes requested.
*/
size_t
LogicalTapeRead(LogicalTapeSet *lts, int tapenum,
void *ptr, size_t size)
{
LogicalTape *lt;
size_t nread = 0;
size_t nthistime;
Assert(tapenum >= 0 && tapenum < lts->nTapes);
lt = lts->tapes[tapenum];
Assert(! lt->writing);
while (size > 0)
{
if (lt->pos >= lt->nbytes)
{
/* Try to load more data into buffer. */
long datablocknum = ltsRecallNextBlockNum(lts, lt->indirect,
lt->frozen);
if (datablocknum == -1L)
break; /* EOF */
lt->curBlockNumber++;
lt->pos = 0;
ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
if (! lt->frozen)
ltsReleaseBlock(lts, datablocknum);
lt->nbytes = (lt->curBlockNumber < lt->numFullBlocks) ?
BLCKSZ : lt->lastBlockBytes;
if (lt->nbytes <= 0)
break; /* EOF (possible here?) */
}
nthistime = lt->nbytes - lt->pos;
if (nthistime > size)
nthistime = size;
Assert(nthistime > 0);
memcpy(ptr, lt->buffer + lt->pos, nthistime);
lt->pos += nthistime;
ptr = (void *) ((char *) ptr + nthistime);
size -= nthistime;
nread += nthistime;
}
return nread;
}
/*
* "Freeze" the contents of a tape so that it can be read multiple times
* and/or read backwards. Once a tape is frozen, its contents will not
* be released until the LogicalTapeSet is destroyed. This is expected
* to be used only for the final output pass of a merge.
*
* This *must* be called just at the end of a write pass, before the
* tape is rewound (after rewind is too late!). It performs a rewind
* and switch to read mode "for free". An immediately following rewind-
* for-read call is OK but not necessary.
*/
void
LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum)
{
LogicalTape *lt;
long datablocknum;
Assert(tapenum >= 0 && tapenum < lts->nTapes);
lt = lts->tapes[tapenum];
Assert(lt->writing);
/*
* Completion of a write phase. Flush last partial data
* block, flush any partial indirect blocks, rewind for
* nondestructive read.
*/
if (lt->dirty)
ltsDumpBuffer(lts, lt);
lt->lastBlockBytes = lt->nbytes;
lt->writing = false;
lt->frozen = true;
datablocknum = ltsRewindIndirectBlock(lts, lt->indirect, true);
/* Read the first block, or reset if tape is empty */
lt->curBlockNumber = 0L;
lt->pos = 0;
lt->nbytes = 0;
if (datablocknum != -1L)
{
ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
lt->nbytes = (lt->curBlockNumber < lt->numFullBlocks) ?
BLCKSZ : lt->lastBlockBytes;
}
}
/*
* Backspace the tape a given number of bytes. (We also support a more
* general seek interface, see below.)
*
* *Only* a frozen-for-read tape can be backed up; we don't support
* random access during write, and an unfrozen read tape may have
* already discarded the desired data!
*
* Return value is TRUE if seek successful, FALSE if there isn't that much
* data before the current point (in which case there's no state change).
*/
bool
LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum, size_t size)
{
LogicalTape *lt;
long nblocks;
int newpos;
Assert(tapenum >= 0 && tapenum < lts->nTapes);
lt = lts->tapes[tapenum];
Assert(lt->frozen);
/*
* Easy case for seek within current block.
*/
if (size <= (size_t) lt->pos)
{
lt->pos -= (int) size;
return true;
}
/*
* Not-so-easy case. Figure out whether it's possible at all.
*/
size -= (size_t) lt->pos; /* part within this block */
nblocks = size / BLCKSZ;
size = size % BLCKSZ;
if (size)
{
nblocks++;
newpos = (int) (BLCKSZ - size);
}
else
newpos = 0;
if (nblocks > lt->curBlockNumber)
return false; /* a seek too far... */
/*
* OK, we need to back up nblocks blocks. This implementation
* would be pretty inefficient for long seeks, but we really
* aren't expecting that (a seek over one tuple is typical).
*/
while (nblocks-- > 0)
{
long datablocknum = ltsRecallPrevBlockNum(lts, lt->indirect);
if (datablocknum == -1L)
elog(ERROR, "LogicalTapeBackspace: unexpected end of tape");
lt->curBlockNumber--;
if (nblocks == 0)
{
ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
lt->nbytes = BLCKSZ;
}
}
lt->pos = newpos;
return true;
}
/*
* Seek to an arbitrary position in a logical tape.
*
* *Only* a frozen-for-read tape can be seeked.
*
* Return value is TRUE if seek successful, FALSE if there isn't that much
* data in the tape (in which case there's no state change).
*/
bool
LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
long blocknum, int offset)
{
LogicalTape *lt;
Assert(tapenum >= 0 && tapenum < lts->nTapes);
lt = lts->tapes[tapenum];
Assert(lt->frozen);
Assert(offset >= 0 && offset <= BLCKSZ);
/*
* Easy case for seek within current block.
*/
if (blocknum == lt->curBlockNumber && offset <= lt->nbytes)
{
lt->pos = offset;
return true;
}
/*
* Not-so-easy case. Figure out whether it's possible at all.
*/
if (blocknum < 0 || blocknum > lt->numFullBlocks ||
(blocknum == lt->numFullBlocks && offset > lt->lastBlockBytes))
return false;
/*
* OK, advance or back up to the target block. This implementation
* would be pretty inefficient for long seeks, but we really
* aren't expecting that (a seek over one tuple is typical).
*/
while (lt->curBlockNumber > blocknum)
{
long datablocknum = ltsRecallPrevBlockNum(lts, lt->indirect);
if (datablocknum == -1L)
elog(ERROR, "LogicalTapeSeek: unexpected end of tape");
if (--lt->curBlockNumber == blocknum)
ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
}
while (lt->curBlockNumber < blocknum)
{
long datablocknum = ltsRecallNextBlockNum(lts, lt->indirect,
lt->frozen);
if (datablocknum == -1L)
elog(ERROR, "LogicalTapeSeek: unexpected end of tape");
if (++lt->curBlockNumber == blocknum)
ltsReadBlock(lts, datablocknum, (void *) lt->buffer);
}
lt->nbytes = (lt->curBlockNumber < lt->numFullBlocks) ?
BLCKSZ : lt->lastBlockBytes;
lt->pos = offset;
return true;
}
/*
* Obtain current position in a form suitable for a later LogicalTapeSeek.
*
* NOTE: it'd be OK to do this during write phase with intention of using
* the position for a seek after freezing. Not clear if anyone needs that.
*/
void
LogicalTapeTell(LogicalTapeSet *lts, int tapenum,
long *blocknum, int *offset)
{
LogicalTape *lt;
Assert(tapenum >= 0 && tapenum < lts->nTapes);
lt = lts->tapes[tapenum];
*blocknum = lt->curBlockNumber;
*offset = lt->pos;
}

View File

@ -1,136 +1,146 @@
/*
/*-------------------------------------------------------------------------
*
* psort.c
* Polyphase merge sort.
*
* Copyright (c) 1994, Regents of the University of California
*
* $Id: psort.c,v 1.57 1999/10/13 15:02:31 tgl Exp $
* See Knuth, volume 3, for more than you want to know about this algorithm.
*
* NOTES
* Sorts the first relation into the second relation.
*
* The old psort.c's routines formed a temporary relation from the merged
* sort files. This version keeps the files around instead of generating the
* relation from them, and provides interface functions to the file so that
* you can grab tuples, mark a position in the file, restore a position in the
* file. You must now explicitly call an interface function to end the sort,
* psort_end, when you are done.
* Now most of the global variables are stuck in the Sort nodes, and
* accessed from there (they are passed to all the psort routines) so that
* each sort running has its own separate state. This is facilitated by having
* the Sort nodes passed in to all the interface functions.
* The one global variable that all the sorts still share is SortMemory.
* You should now be allowed to run two or more psorts concurrently,
* so long as the memory they eat up is not greater than SORTMEM, the initial
* value of SortMemory. -Rex 2.15.1995
* This needs to be generalized to handle index tuples as well as heap tuples,
* so that the near-duplicate code in nbtsort.c can be eliminated. Also,
* I think it's got memory leak problems.
*
* Use the tape-splitting method (Knuth, Vol. III, pp281-86) in the future.
* Copyright (c) 1994, Regents of the University of California
*
* Arguments? Variables?
* MAXMERGE, MAXTAPES
* IDENTIFICATION
* $Header: /cvsroot/pgsql/src/backend/utils/sort/Attic/psort.c,v 1.58 1999/10/16 19:49:27 tgl Exp $
*
*-------------------------------------------------------------------------
*/
#include <math.h>
#include <sys/types.h>
#include <unistd.h>
#include "postgres.h"
#include "access/heapam.h"
#include "access/relscan.h"
#include "executor/execdebug.h"
#include "executor/executor.h"
#include "miscadmin.h"
#include "utils/logtape.h"
#include "utils/lselect.h"
#include "utils/psort.h"
#define MAXTAPES 7 /* See Knuth Fig. 70, p273 */
struct tape
{
int tp_dummy; /* (D) */
int tp_fib; /* (A) */
int tp_tapenum; /* (TAPE) */
struct tape *tp_prev;
};
/*
* Private state of a Psort operation. The "psortstate" field in a Sort node
* points to one of these. This replaces a lot of global variables that used
* to be here...
*/
typedef struct Psortstate
{
LeftistContextData treeContext;
int TapeRange; /* number of tapes less 1 (T) */
int Level; /* Knuth's l */
int TotalDummy; /* sum of tp_dummy across all tapes */
struct tape Tape[MAXTAPES];
LogicalTapeSet *tapeset; /* logtape.c object for tapes in a temp file */
int BytesRead; /* I/O statistics (useless) */
int BytesWritten;
int tupcount;
struct leftist *Tuples; /* current tuple tree */
int psort_grab_tape; /* tape number of finished output data */
long psort_current; /* array index (only used if not tape) */
/* psort_saved(_offset) holds marked position for mark and restore */
long psort_saved; /* could be tape block#, or array index */
int psort_saved_offset; /* lower bits of psort_saved, if tape */
bool using_tape_files;
bool all_fetched; /* this is for cursors */
HeapTuple *memtuples;
} Psortstate;
/*
* PS - Macro to access and cast psortstate from a Sort node
*/
#define PS(N) ((Psortstate *)(N)->psortstate)
static bool createfirstrun(Sort *node);
static bool createrun(Sort *node, BufFile *file);
static void destroytape(BufFile *file);
static void dumptuples(BufFile *file, Sort *node);
static BufFile *gettape(void);
static bool createrun(Sort *node, int desttapenum);
static void dumptuples(Sort *node, int desttapenum);
static void initialrun(Sort *node);
static void inittapes(Sort *node);
static void merge(Sort *node, struct tape * dest);
static BufFile *mergeruns(Sort *node);
static int mergeruns(Sort *node);
static int _psort_cmp(HeapTuple *ltup, HeapTuple *rtup);
/*
* tlenzero used to delimit runs; both vars below must have
* the same size as HeapTuple->t_len
*/
static unsigned int tlenzero = 0;
static unsigned int tlendummy;
/* these are used by _psort_cmp, and are set just before calling qsort() */
static TupleDesc PsortTupDesc;
static ScanKey PsortKeys;
static int PsortNkeys;
/*
* old psort global variables
* tlenzero is used to write a zero to delimit runs, tlendummy is used
* to read in length words that we don't care about.
*
* (These are the global variables from the old psort. They are still used,
* but are now accessed from Sort nodes using the PS macro. Note that while
* these variables will be accessed by PS(node)->whatever, they will still
* be called by their original names within the comments! -Rex 2.10.1995)
*
* LeftistContextData treeContext;
*
* static int TapeRange; number of tapes - 1 (T)
* static int Level; (l)
* static int TotalDummy; summation of tp_dummy
* static struct tape *Tape;
*
* static int BytesRead; to keep track of # of IO
* static int BytesWritten;
*
* struct leftist *Tuples; current tuples in memory
*
* BufFile *psort_grab_file; this holds tuples grabbed
* from merged sort runs
* long psort_current; current file position
* long psort_saved; file position saved for
* mark and restore
* both vars must have the same size as HeapTuple->t_len
*/
static unsigned int tlenzero = 0;
static unsigned int tlendummy;
/*
* PS - Macro to access and cast psortstate from a Sort node
*/
#define PS(N) ((Psortstate *)N->psortstate)
/*
* psort_begin - polyphase merge sort entry point. Sorts the subplan
* into a temporary file psort_grab_file. After
* this is called, calling the interface function
* psort_grabtuple iteratively will get you the sorted
* tuples. psort_end then finishes the sort off, after
* all the tuples have been grabbed.
* psort_begin
*
* Allocates and initializes sort node's psort state.
* polyphase merge sort entry point. Sorts the subplan
* into memory or a temporary file. After
* this is called, calling the interface function
* psort_grabtuple iteratively will get you the sorted
* tuples. psort_end releases storage when done.
*
* Allocates and initializes sort node's psort state.
*/
bool
psort_begin(Sort *node, int nkeys, ScanKey key)
{
node->psortstate = (struct Psortstate *) palloc(sizeof(struct Psortstate));
AssertArg(nkeys >= 1);
AssertArg(key[0].sk_attno != 0);
AssertArg(key[0].sk_procedure != 0);
PS(node)->BytesRead = 0;
PS(node)->BytesWritten = 0;
node->psortstate = (void *) palloc(sizeof(struct Psortstate));
PS(node)->treeContext.tupDesc = ExecGetTupType(outerPlan((Plan *) node));
PS(node)->treeContext.nKeys = nkeys;
PS(node)->treeContext.scanKeys = key;
PS(node)->treeContext.sortMem = SortMem * 1024;
PS(node)->Tuples = NULL;
PS(node)->tapeset = NULL;
PS(node)->BytesRead = 0;
PS(node)->BytesWritten = 0;
PS(node)->tupcount = 0;
PS(node)->Tuples = NULL;
PS(node)->using_tape_files = false;
PS(node)->all_fetched = false;
PS(node)->psort_grab_file = NULL;
PS(node)->psort_grab_tape = -1;
PS(node)->memtuples = NULL;
initialrun(node);
@ -138,12 +148,12 @@ psort_begin(Sort *node, int nkeys, ScanKey key)
if (PS(node)->tupcount == 0)
return false;
if (PS(node)->using_tape_files && PS(node)->psort_grab_file == NULL)
PS(node)->psort_grab_file = mergeruns(node);
if (PS(node)->using_tape_files && PS(node)->psort_grab_tape == -1)
PS(node)->psort_grab_tape = mergeruns(node);
PS(node)->psort_current = 0;
PS(node)->psort_saved_fileno = 0;
PS(node)->psort_current = 0L;
PS(node)->psort_saved = 0L;
PS(node)->psort_saved_offset = 0;
return true;
}
@ -151,8 +161,8 @@ psort_begin(Sort *node, int nkeys, ScanKey key)
/*
* inittapes - initializes the tapes
* - (polyphase merge Alg.D(D1)--Knuth, Vol.3, p.270)
* Returns:
* number of allocated tapes
*
* This is called only if we have found we don't have room to sort in memory.
*/
static void
inittapes(Sort *node)
@ -163,16 +173,14 @@ inittapes(Sort *node)
Assert(node != (Sort *) NULL);
Assert(PS(node) != (Psortstate *) NULL);
/*
* ASSERT(ntapes >= 3 && ntapes <= MAXTAPES, "inittapes: Invalid
* number of tapes to initialize.\n");
*/
PS(node)->tapeset = LogicalTapeSetCreate(MAXTAPES);
tp = PS(node)->Tape;
for (i = 0; i < MAXTAPES && (tp->tp_file = gettape()) != NULL; i++)
for (i = 0; i < MAXTAPES; i++)
{
tp->tp_dummy = 1;
tp->tp_fib = 1;
tp->tp_tapenum = i;
tp->tp_prev = tp - 1;
tp++;
}
@ -181,10 +189,6 @@ inittapes(Sort *node)
tp->tp_fib = 0;
PS(node)->Tape[0].tp_prev = tp;
if (PS(node)->TapeRange <= 1)
elog(ERROR, "inittapes: Could only allocate %d < 3 tapes\n",
PS(node)->TapeRange + 1);
PS(node)->Level = 1;
PS(node)->TotalDummy = PS(node)->TapeRange;
@ -194,9 +198,9 @@ inittapes(Sort *node)
/*
* PUTTUP - writes the next tuple
* ENDRUN - mark end of run
* GETLEN - reads the length of the next tuple
* TRYGETLEN - reads the length of the next tuple, if any
* GETLEN - reads the length of the next tuple, must be one
* ALLOCTUP - returns space for the new tuple
* SETTUPLEN - stores the length into the tuple
* GETTUP - reads the tuple
*
* Note:
@ -204,31 +208,47 @@ inittapes(Sort *node)
*/
#define PUTTUP(NODE, TUP, FP) \
#define PUTTUP(NODE, TUP, TAPE) \
( \
(TUP)->t_len += HEAPTUPLESIZE, \
((Psortstate *)NODE->psortstate)->BytesWritten += (TUP)->t_len, \
BufFileWrite(FP, (char *)TUP, (TUP)->t_len), \
BufFileWrite(FP, (char *)&((TUP)->t_len), sizeof(tlendummy)), \
PS(NODE)->BytesWritten += (TUP)->t_len, \
LogicalTapeWrite(PS(NODE)->tapeset, (TAPE), (void*)(TUP), (TUP)->t_len), \
LogicalTapeWrite(PS(NODE)->tapeset, (TAPE), (void*)&((TUP)->t_len), sizeof(tlendummy)), \
(TUP)->t_len -= HEAPTUPLESIZE \
)
#define ENDRUN(FP) BufFileWrite(FP, (char *)&tlenzero, sizeof(tlenzero))
#define GETLEN(LEN, FP) BufFileRead(FP, (char *)&(LEN), sizeof(tlenzero))
#define ALLOCTUP(LEN) ((HeapTuple)palloc((unsigned)LEN))
#define FREE(x) pfree((char *) x)
#define GETTUP(NODE, TUP, LEN, FP) \
( \
IncrProcessed(), \
((Psortstate *)NODE->psortstate)->BytesRead += (LEN) - sizeof(tlenzero), \
BufFileRead(FP, (char *)(TUP) + sizeof(tlenzero), (LEN) - sizeof(tlenzero)), \
(TUP)->t_data = (HeapTupleHeader) ((char *)(TUP) + HEAPTUPLESIZE), \
BufFileRead(FP, (char *)&tlendummy, sizeof(tlendummy)) \
)
#define ENDRUN(NODE, TAPE) \
LogicalTapeWrite(PS(NODE)->tapeset, (TAPE), (void *)&tlenzero, sizeof(tlenzero))
#define SETTUPLEN(TUP, LEN) ((TUP)->t_len = (LEN) - HEAPTUPLESIZE)
#define TRYGETLEN(NODE, LEN, TAPE) \
(LogicalTapeRead(PS(NODE)->tapeset, (TAPE), \
(void *) &(LEN), sizeof(tlenzero)) == sizeof(tlenzero) \
&& (LEN) != 0)
#define rewind(FP) BufFileSeek(FP, 0, 0L, SEEK_SET)
#define GETLEN(NODE, LEN, TAPE) \
do { \
if (! TRYGETLEN(NODE, LEN, TAPE)) \
elog(ERROR, "psort: unexpected end of data"); \
} while(0)
static void GETTUP(Sort *node, HeapTuple tup, unsigned int len, int tape)
{
IncrProcessed();
PS(node)->BytesRead += len;
if (LogicalTapeRead(PS(node)->tapeset, tape,
((char *) tup) + sizeof(tlenzero),
len - sizeof(tlenzero)) != len - sizeof(tlenzero))
elog(ERROR, "psort: unexpected end of data");
tup->t_len = len - HEAPTUPLESIZE;
tup->t_data = (HeapTupleHeader) ((char *) tup + HEAPTUPLESIZE);
if (LogicalTapeRead(PS(node)->tapeset, tape,
(void *) &tlendummy,
sizeof(tlendummy)) != sizeof(tlendummy))
elog(ERROR, "psort: unexpected end of data");
}
#define ALLOCTUP(LEN) ((HeapTuple) palloc(LEN))
#define FREE(x) pfree((char *) (x))
/*
* USEMEM - record use of memory FREEMEM - record
@ -268,10 +288,10 @@ inittapes(Sort *node)
static void
initialrun(Sort *node)
{
/* struct tuple *tup; */
struct tape *tp;
int baseruns; /* D:(a) */
int extrapasses; /* EOF */
int tapenum;
Assert(node != (Sort *) NULL);
Assert(PS(node) != (Psortstate *) NULL);
@ -284,8 +304,8 @@ initialrun(Sort *node)
extrapasses = 0;
}
else
/* all tuples fetched */
{
/* all tuples fetched */
if (!PS(node)->using_tape_files) /* empty or sorted in
* memory */
return;
@ -297,8 +317,9 @@ initialrun(Sort *node)
*/
if (PS(node)->Tuples == NULL)
{
PS(node)->psort_grab_file = PS(node)->Tape->tp_file;
rewind(PS(node)->psort_grab_file);
PS(node)->psort_grab_tape = PS(node)->Tape[0].tp_tapenum;
/* freeze and rewind the finished output tape */
LogicalTapeFreeze(PS(node)->tapeset, PS(node)->psort_grab_tape);
return;
}
extrapasses = 2;
@ -334,19 +355,20 @@ initialrun(Sort *node)
{
if (--extrapasses)
{
dumptuples(tp->tp_file, node);
ENDRUN(tp->tp_file);
dumptuples(node, tp->tp_tapenum);
ENDRUN(node, tp->tp_tapenum);
continue;
}
else
break;
}
if ((bool) createrun(node, tp->tp_file) == false)
if (createrun(node, tp->tp_tapenum) == false)
extrapasses = 1 + (PS(node)->Tuples != NULL);
/* D2 */
}
for (tp = PS(node)->Tape + PS(node)->TapeRange; tp >= PS(node)->Tape; tp--)
rewind(tp->tp_file); /* D. */
/* End of step D2: rewind all output tapes to prepare for merging */
for (tapenum = 0; tapenum < PS(node)->TapeRange; tapenum++)
LogicalTapeRewind(PS(node)->tapeset, tapenum, false);
}
/*
@ -374,7 +396,7 @@ createfirstrun(Sort *node)
Assert(PS(node)->memtuples == NULL);
Assert(PS(node)->tupcount == 0);
if (LACKMEM(node))
elog(ERROR, "psort: LACKMEM in createfirstrun");
elog(ERROR, "psort: LACKMEM before createfirstrun");
memtuples = palloc(t_free * sizeof(HeapTuple));
@ -439,7 +461,7 @@ createfirstrun(Sort *node)
for (t = t_last - 1; t >= 0; t--)
puttuple(&PS(node)->Tuples, memtuples[t], 0, &PS(node)->treeContext);
pfree(memtuples);
foundeor = !createrun(node, PS(node)->Tape->tp_file);
foundeor = ! createrun(node, PS(node)->Tape->tp_tapenum);
}
else
{
@ -451,8 +473,10 @@ createfirstrun(Sort *node)
}
/*
* createrun - places the next run on file, grabbing the tuples by
* executing the subplan passed in
* createrun
*
* Create the next run and write it to desttapenum, grabbing the tuples by
* executing the subplan passed in
*
* Uses:
* Tuples, which should contain any tuples for this run
@ -462,7 +486,7 @@ createfirstrun(Sort *node)
* Tuples contains the tuples for the following run upon exit
*/
static bool
createrun(Sort *node, BufFile *file)
createrun(Sort *node, int desttapenum)
{
HeapTuple lasttuple;
HeapTuple tup;
@ -492,7 +516,7 @@ createrun(Sort *node, BufFile *file)
}
lasttuple = gettuple(&PS(node)->Tuples, &junk,
&PS(node)->treeContext);
PUTTUP(node, lasttuple, file);
PUTTUP(node, lasttuple, desttapenum);
TRACEOUT(createrun, lasttuple);
}
@ -545,8 +569,8 @@ createrun(Sort *node, BufFile *file)
FREE(lasttuple);
TRACEMEM(createrun);
}
dumptuples(file, node);
ENDRUN(file); /* delimit the end of the run */
dumptuples(node, desttapenum);
ENDRUN(node, desttapenum); /* delimit the end of the run */
t_last++;
/* put tuples for the next run into leftist tree */
@ -573,28 +597,31 @@ createrun(Sort *node, BufFile *file)
* (polyphase merge Alg.D(D6)--Knuth, Vol.3, p271)
*
* Returns:
* file of tuples in order
* tape number of finished tape containing all tuples in order
*/
static BufFile *
static int
mergeruns(Sort *node)
{
struct tape *tp;
Assert(node != (Sort *) NULL);
Assert(PS(node) != (Psortstate *) NULL);
Assert(PS(node)->using_tape_files == true);
Assert(PS(node)->using_tape_files);
tp = PS(node)->Tape + PS(node)->TapeRange;
merge(node, tp);
rewind(tp->tp_file);
while (--PS(node)->Level != 0)
{
/* rewind output tape to use as new input */
LogicalTapeRewind(PS(node)->tapeset, tp->tp_tapenum, false);
tp = tp->tp_prev;
rewind(tp->tp_file);
/* rewind new output tape and prepare it for write pass */
LogicalTapeRewind(PS(node)->tapeset, tp->tp_tapenum, true);
merge(node, tp);
rewind(tp->tp_file);
}
return tp->tp_file;
/* freeze and rewind the final output tape */
LogicalTapeFreeze(PS(node)->tapeset, tp->tp_tapenum);
return tp->tp_tapenum;
}
/*
@ -608,7 +635,7 @@ merge(Sort *node, struct tape * dest)
struct tape *lasttp; /* (TAPE[P]) */
struct tape *tp;
struct leftist *tuples;
BufFile *destfile;
int desttapenum;
int times; /* runs left to merge */
int outdummy; /* complete dummy runs */
short fromtape;
@ -616,7 +643,7 @@ merge(Sort *node, struct tape * dest)
Assert(node != (Sort *) NULL);
Assert(PS(node) != (Psortstate *) NULL);
Assert(PS(node)->using_tape_files == true);
Assert(PS(node)->using_tape_files);
lasttp = dest->tp_prev;
times = lasttp->tp_fib;
@ -641,19 +668,18 @@ merge(Sort *node, struct tape * dest)
/* do not add the outdummy runs yet */
times -= outdummy;
}
destfile = dest->tp_file;
desttapenum = dest->tp_tapenum;
while (times-- != 0)
{ /* merge one run */
tuples = NULL;
if (PS(node)->TotalDummy == 0)
for (tp = dest->tp_prev; tp != dest; tp = tp->tp_prev)
{
GETLEN(tuplen, tp->tp_file);
GETLEN(node, tuplen, tp->tp_tapenum);
tup = ALLOCTUP(tuplen);
USEMEM(node, tuplen);
TRACEMEM(merge);
SETTUPLEN(tup, tuplen);
GETTUP(node, tup, tuplen, tp->tp_file);
GETTUP(node, tup, tuplen, tp->tp_tapenum);
puttuple(&tuples, tup, tp - PS(node)->Tape,
&PS(node)->treeContext);
}
@ -668,12 +694,11 @@ merge(Sort *node, struct tape * dest)
}
else
{
GETLEN(tuplen, tp->tp_file);
GETLEN(node, tuplen, tp->tp_tapenum);
tup = ALLOCTUP(tuplen);
USEMEM(node, tuplen);
TRACEMEM(merge);
SETTUPLEN(tup, tuplen);
GETTUP(node, tup, tuplen, tp->tp_file);
GETTUP(node, tup, tuplen, tp->tp_tapenum);
puttuple(&tuples, tup, tp - PS(node)->Tape,
&PS(node)->treeContext);
}
@ -683,38 +708,34 @@ merge(Sort *node, struct tape * dest)
{
/* possible optimization by using count in tuples */
tup = gettuple(&tuples, &fromtape, &PS(node)->treeContext);
PUTTUP(node, tup, destfile);
PUTTUP(node, tup, desttapenum);
FREEMEM(node, tup->t_len);
FREE(tup);
TRACEMEM(merge);
GETLEN(tuplen, PS(node)->Tape[fromtape].tp_file);
if (tuplen == 0)
;
else
if (TRYGETLEN(node, tuplen, PS(node)->Tape[fromtape].tp_tapenum))
{
tup = ALLOCTUP(tuplen);
USEMEM(node, tuplen);
TRACEMEM(merge);
SETTUPLEN(tup, tuplen);
GETTUP(node, tup, tuplen, PS(node)->Tape[fromtape].tp_file);
GETTUP(node, tup, tuplen, PS(node)->Tape[fromtape].tp_tapenum);
puttuple(&tuples, tup, fromtape, &PS(node)->treeContext);
}
}
ENDRUN(destfile);
ENDRUN(node, desttapenum);
}
PS(node)->TotalDummy += outdummy;
}
/*
* dumptuples - stores all the tuples in tree into file
* dumptuples - stores all the tuples remaining in tree to dest tape
*/
static void
dumptuples(BufFile *file, Sort *node)
dumptuples(Sort *node, int desttapenum)
{
LeftistContext context = &PS(node)->treeContext;
struct leftist **treep = &PS(node)->Tuples;
struct leftist *tp;
struct leftist *newp;
struct leftist **treep = &PS(node)->Tuples;
LeftistContext context = &PS(node)->treeContext;
HeapTuple tup;
Assert(PS(node)->using_tape_files);
@ -728,7 +749,7 @@ dumptuples(BufFile *file, Sort *node)
else
newp = lmerge(tp->lt_left, tp->lt_right, context);
pfree(tp);
PUTTUP(node, tup, file);
PUTTUP(node, tup, desttapenum);
FREEMEM(node, tup->t_len);
FREE(tup);
@ -760,11 +781,10 @@ psort_grabtuple(Sort *node, bool *should_free)
{
if (PS(node)->all_fetched)
return NULL;
if (GETLEN(tuplen, PS(node)->psort_grab_file) && tuplen != 0)
if (TRYGETLEN(node, tuplen, PS(node)->psort_grab_tape))
{
tup = ALLOCTUP(tuplen);
SETTUPLEN(tup, tuplen);
GETTUP(node, tup, tuplen, PS(node)->psort_grab_file);
GETTUP(node, tup, tuplen, PS(node)->psort_grab_tape);
return tup;
}
else
@ -786,10 +806,11 @@ psort_grabtuple(Sort *node, bool *should_free)
* length word. If seek fails we must have a completely empty
* file.
*/
if (BufFileSeek(PS(node)->psort_grab_file, 0,
- (long) (2 * sizeof(tlendummy)), SEEK_CUR))
if (! LogicalTapeBackspace(PS(node)->tapeset,
PS(node)->psort_grab_tape,
2 * sizeof(tlendummy)))
return NULL;
GETLEN(tuplen, PS(node)->psort_grab_file);
GETLEN(node, tuplen, PS(node)->psort_grab_tape);
PS(node)->all_fetched = false;
}
else
@ -798,28 +819,29 @@ psort_grabtuple(Sort *node, bool *should_free)
* Back up and fetch prev tuple's ending length word.
* If seek fails, assume we are at start of file.
*/
if (BufFileSeek(PS(node)->psort_grab_file, 0,
- (long) sizeof(tlendummy), SEEK_CUR))
if (! LogicalTapeBackspace(PS(node)->tapeset,
PS(node)->psort_grab_tape,
sizeof(tlendummy)))
return NULL;
GETLEN(tuplen, PS(node)->psort_grab_file);
if (tuplen == 0)
elog(ERROR, "psort_grabtuple: tuplen is 0 in backward scan");
GETLEN(node, tuplen, PS(node)->psort_grab_tape);
/*
* Back up to get ending length word of tuple before it.
*/
if (BufFileSeek(PS(node)->psort_grab_file, 0,
- (long) (tuplen + 2*sizeof(tlendummy)), SEEK_CUR))
if (! LogicalTapeBackspace(PS(node)->tapeset,
PS(node)->psort_grab_tape,
tuplen + 2*sizeof(tlendummy)))
{
/* If fail, presumably the prev tuple is the first in the file.
* Back up so that it becomes next to read in forward direction
* (not obviously right, but that is what in-memory case does)
*/
if (BufFileSeek(PS(node)->psort_grab_file, 0,
- (long) (tuplen + sizeof(tlendummy)), SEEK_CUR))
if (! LogicalTapeBackspace(PS(node)->tapeset,
PS(node)->psort_grab_tape,
tuplen + sizeof(tlendummy)))
elog(ERROR, "psort_grabtuple: too big last tuple len in backward scan");
return NULL;
}
GETLEN(tuplen, PS(node)->psort_grab_file);
GETLEN(node, tuplen, PS(node)->psort_grab_tape);
}
/*
@ -827,12 +849,12 @@ psort_grabtuple(Sort *node, bool *should_free)
* Note: GETTUP expects we are positioned after the initial length
* word of the tuple, so back up to that point.
*/
if (BufFileSeek(PS(node)->psort_grab_file, 0,
- (long) tuplen, SEEK_CUR))
if (! LogicalTapeBackspace(PS(node)->tapeset,
PS(node)->psort_grab_tape,
tuplen))
elog(ERROR, "psort_grabtuple: too big tuple len in backward scan");
tup = ALLOCTUP(tuplen);
SETTUPLEN(tup, tuplen);
GETTUP(node, tup, tuplen, PS(node)->psort_grab_file);
GETTUP(node, tup, tuplen, PS(node)->psort_grab_tape);
return tup;
}
else
@ -880,9 +902,10 @@ psort_markpos(Sort *node)
Assert(PS(node) != (Psortstate *) NULL);
if (PS(node)->using_tape_files == true)
BufFileTell(PS(node)->psort_grab_file,
& PS(node)->psort_saved_fileno,
& PS(node)->psort_saved);
LogicalTapeTell(PS(node)->tapeset,
PS(node)->psort_grab_tape,
& PS(node)->psort_saved,
& PS(node)->psort_saved_offset);
else
PS(node)->psort_saved = PS(node)->psort_current;
}
@ -898,46 +921,41 @@ psort_restorepos(Sort *node)
Assert(PS(node) != (Psortstate *) NULL);
if (PS(node)->using_tape_files == true)
BufFileSeek(PS(node)->psort_grab_file,
PS(node)->psort_saved_fileno,
PS(node)->psort_saved,
SEEK_SET);
{
if (! LogicalTapeSeek(PS(node)->tapeset,
PS(node)->psort_grab_tape,
PS(node)->psort_saved,
PS(node)->psort_saved_offset))
elog(ERROR, "psort_restorepos failed");
}
else
PS(node)->psort_current = PS(node)->psort_saved;
}
/*
* psort_end - unlinks the tape files, and cleans up. Should not be
* called unless psort_grabtuple has returned a NULL.
* psort_end
*
* Release resources and clean up.
*/
void
psort_end(Sort *node)
{
struct tape *tp;
if (!node->cleaned)
/* node->cleaned is probably redundant? */
if (!node->cleaned && PS(node) != (Psortstate *) NULL)
{
if (PS(node)->tapeset)
LogicalTapeSetClose(PS(node)->tapeset);
if (PS(node)->memtuples)
pfree(PS(node)->memtuples);
/*
* I'm changing this because if we are sorting a relation with no
* tuples, psortstate is NULL.
*/
if (PS(node) != (Psortstate *) NULL)
{
if (PS(node)->using_tape_files == true)
for (tp = PS(node)->Tape + PS(node)->TapeRange; tp >= PS(node)->Tape; tp--)
destroytape(tp->tp_file);
else if (PS(node)->memtuples)
pfree(PS(node)->memtuples);
/* XXX what about freeing leftist tree and tuples in memory? */
NDirectFileRead += (int) ceil((double) PS(node)->BytesRead / BLCKSZ);
NDirectFileWrite += (int) ceil((double) PS(node)->BytesWritten / BLCKSZ);
NDirectFileRead += (int) ceil((double) PS(node)->BytesRead / BLCKSZ);
NDirectFileWrite += (int) ceil((double) PS(node)->BytesWritten / BLCKSZ);
pfree((void *) node->psortstate);
node->psortstate = NULL;
node->cleaned = TRUE;
}
pfree((void *) node->psortstate);
node->psortstate = NULL;
node->cleaned = TRUE;
}
}
@ -951,46 +969,22 @@ psort_rescan(Sort *node)
if (((Plan *) node)->lefttree->chgParam != NULL)
{
psort_end(node);
node->cleaned = false;
node->cleaned = false; /* huh? */
}
else if (PS(node) != (Psortstate *) NULL)
{
PS(node)->all_fetched = false;
PS(node)->psort_current = 0;
PS(node)->psort_saved_fileno = 0;
PS(node)->psort_saved = 0L;
PS(node)->psort_saved_offset = 0;
if (PS(node)->using_tape_files == true)
rewind(PS(node)->psort_grab_file);
LogicalTapeRewind(PS(node)->tapeset,
PS(node)->psort_grab_tape,
false);
}
}
/*
* gettape - returns an open stream for writing/reading
*
* Returns:
* Open stream for writing/reading.
* NULL if unable to open temporary file.
*
* There used to be a lot of cruft here to try to ensure that we destroyed
* all the tape files; but it didn't really work. Now we rely on fd.c to
* clean up temp files if an error occurs.
*/
static BufFile *
gettape()
{
return BufFileCreateTemp();
}
/*
* destroytape - unlinks the tape
*/
static void
destroytape(BufFile *file)
{
BufFileClose(file);
}
static int
_psort_cmp(HeapTuple *ltup, HeapTuple *rtup)
{

View File

@ -17,7 +17,7 @@
*
* Copyright (c) 1994, Regents of the University of California
*
* $Id: buffile.h,v 1.1 1999/10/13 15:02:32 tgl Exp $
* $Id: buffile.h,v 1.2 1999/10/16 19:49:27 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@ -37,11 +37,12 @@ typedef struct BufFile BufFile;
extern BufFile *BufFileCreateTemp(void);
extern BufFile *BufFileCreate(File file);
extern BufFile *BufFileReaccess(BufFile *file);
extern void BufFileClose(BufFile *file);
extern size_t BufFileRead(BufFile *file, void *ptr, size_t size);
extern size_t BufFileWrite(BufFile *file, void *ptr, size_t size);
extern int BufFileSeek(BufFile *file, int fileno, long offset, int whence);
extern void BufFileTell(BufFile *file, int *fileno, long *offset);
extern int BufFileSeekBlock(BufFile *file, long blknum);
extern long BufFileTellBlock(BufFile *file);
#endif /* BUFFILE_H */

View File

@ -0,0 +1,41 @@
/*-------------------------------------------------------------------------
*
* logtape.h
* Management of "logical tapes" within temporary files.
*
* See logtape.c for explanations.
*
* Copyright (c) 1994, Regents of the University of California
*
* $Id: logtape.h,v 1.1 1999/10/16 19:49:28 tgl Exp $
*
*-------------------------------------------------------------------------
*/
#ifndef LOGTAPE_H
#define LOGTAPE_H
/* LogicalTapeSet is an opaque type whose details are not known outside logtape.c. */
typedef struct LogicalTapeSet LogicalTapeSet;
/*
* prototypes for functions in logtape.c
*/
extern LogicalTapeSet *LogicalTapeSetCreate(int ntapes);
extern void LogicalTapeSetClose(LogicalTapeSet *lts);
extern size_t LogicalTapeRead(LogicalTapeSet *lts, int tapenum,
void *ptr, size_t size);
extern void LogicalTapeWrite(LogicalTapeSet *lts, int tapenum,
void *ptr, size_t size);
extern void LogicalTapeRewind(LogicalTapeSet *lts, int tapenum, bool forWrite);
extern void LogicalTapeFreeze(LogicalTapeSet *lts, int tapenum);
extern bool LogicalTapeBackspace(LogicalTapeSet *lts, int tapenum,
size_t size);
extern bool LogicalTapeSeek(LogicalTapeSet *lts, int tapenum,
long blocknum, int offset);
extern void LogicalTapeTell(LogicalTapeSet *lts, int tapenum,
long *blocknum, int *offset);
#endif /* LOGTAPE_H */

View File

@ -1,101 +1,21 @@
/*-------------------------------------------------------------------------
*
* psort.h
*
*
* Polyphase merge sort.
*
* Copyright (c) 1994, Regents of the University of California
*
* $Id: psort.h,v 1.22 1999/10/13 15:02:28 tgl Exp $
* $Id: psort.h,v 1.23 1999/10/16 19:49:28 tgl Exp $
*
*-------------------------------------------------------------------------
*/
#ifndef PSORT_H
#define PSORT_H
#include "access/relscan.h"
#include "access/htup.h"
#include "access/skey.h"
#include "nodes/plannodes.h"
#include "storage/buffile.h"
#include "utils/lselect.h"
#define MAXTAPES 7 /* See Knuth Fig. 70, p273 */
struct tape
{
int tp_dummy; /* (D) */
int tp_fib; /* (A) */
BufFile *tp_file; /* (TAPE) */
struct tape *tp_prev;
};
struct cmplist
{
int cp_attn; /* attribute number */
int cp_num; /* comparison function code */
int cp_rev; /* invert comparison flag */
struct cmplist *cp_next; /* next in chain */
};
/* This structure preserves the state of psort between calls from different
* nodes to its interface functions. Basically, it includes all of the global
* variables in psort. In case you were wondering, pointers to these structures
* are included in Sort node structures. -Rex 2.6.1995
*/
typedef struct Psortstate
{
LeftistContextData treeContext;
int TapeRange;
int Level;
int TotalDummy;
struct tape Tape[MAXTAPES];
int BytesRead;
int BytesWritten;
int tupcount;
struct leftist *Tuples;
BufFile *psort_grab_file;
long psort_current; /* array index (only used if not tape) */
int psort_saved_fileno; /* upper bits of psort_saved, if tape */
long psort_saved; /* could be file offset, or array index */
bool using_tape_files;
bool all_fetched; /* this is for cursors */
HeapTuple *memtuples;
} Psortstate;
#ifdef EBUG
#include "storage/buf.h"
#include "storage/bufmgr.h"
#define PDEBUG(PROC, S1)\
elog(DEBUG, "%s:%d>> PROC: %s.", __FILE__, __LINE__, S1)
#define PDEBUG2(PROC, S1, D1)\
elog(DEBUG, "%s:%d>> PROC: %s %d.", __FILE__, __LINE__, S1, D1)
#define PDEBUG4(PROC, S1, D1, S2, D2)\
elog(DEBUG, "%s:%d>> PROC: %s %d, %s %d.", __FILE__, __LINE__, S1, D1, S2, D2)
#define VDEBUG(VAR, FMT)\
elog(DEBUG, "%s:%d>> VAR =FMT", __FILE__, __LINE__, VAR)
#define ASSERT(EXPR, STR)\
if (!(EXPR)) elog(FATAL, "%s:%d>> %s", __FILE__, __LINE__, STR)
#define TRACE(VAL, CODE)\
if (1) CODE; else
#else
#define PDEBUG(MSG)
#define VDEBUG(VAR, FMT)
#define ASSERT(EXPR, MSG)
#define TRACE(VAL, CODE)
#endif
/* psort.c */
extern bool psort_begin(Sort *node, int nkeys, ScanKey key);
extern HeapTuple psort_grabtuple(Sort *node, bool *should_free);
extern void psort_markpos(Sort *node);