*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.113 2001/03/25 23:23:58 tgl Exp $
+ * $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.114 2001/05/12 19:58:27 tgl Exp $
*
*
* INTERFACE ROUTINES
return;
}
- *buffer = ReleaseAndReadBuffer(*buffer, relation, page);
+ *buffer = ReleaseAndReadBuffer(*buffer, relation, page, false);
if (!BufferIsValid(*buffer))
elog(ERROR, "heapgettup: failed ReadBuffer");
*
*
* IDENTIFICATION
- * $Id: hio.c,v 1.37 2001/03/22 06:16:07 momjian Exp $
+ * $Id: hio.c,v 1.38 2001/05/12 19:58:27 tgl Exp $
*
*-------------------------------------------------------------------------
*/
/*
* RelationGetBufferForTuple
*
- * Returns (locked) buffer with free space >= given len.
+ * Returns exclusive-locked buffer with free space >= given len.
*
* Note that we use LockPage to lock relation for extension. We can
* do this as long as in all other places we use page-level locking
*
* ELOG(ERROR) is allowed here, so this routine *must* be called
* before any (unlogged) changes are made in buffer pool.
- *
*/
Buffer
RelationGetBufferForTuple(Relation relation, Size len)
{
- Buffer buffer;
+ Buffer buffer = InvalidBuffer;
Page pageHeader;
- BlockNumber lastblock;
+ BlockNumber lastblock,
+ oldnblocks;
len = MAXALIGN(len); /* be conservative */
elog(ERROR, "Tuple is too big: size %lu, max size %ld",
(unsigned long) len, MaxTupleSize);
- if (!relation->rd_myxactonly)
- LockPage(relation, 0, ExclusiveLock);
-
/*
- * XXX This does an lseek - VERY expensive - but at the moment it is
- * the only way to accurately determine how many blocks are in a
- * relation. A good optimization would be to get this to actually
- * work properly.
+ * First, use relcache's record of table length to guess where the
+ * last page is, and try to put the tuple there. This cached value
+ * may be out of date, in which case we'll be inserting into a non-last
+ * page, but that should be OK. Note that in a newly created relcache
+ * entry, rd_nblocks may be zero; if so, we'll set it correctly below.
*/
- lastblock = RelationGetNumberOfBlocks(relation);
-
- /*
- * Get the last existing page --- may need to create the first one if
- * this is a virgin relation.
- */
- if (lastblock == 0)
- {
- buffer = ReadBuffer(relation, P_NEW);
- LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
- pageHeader = (Page) BufferGetPage(buffer);
- Assert(PageIsNew((PageHeader) pageHeader));
- PageInit(pageHeader, BufferGetPageSize(buffer), 0);
- }
- else
+ if (relation->rd_nblocks > 0)
{
- buffer = ReadBuffer(relation, lastblock - 1);
+ lastblock = relation->rd_nblocks - 1;
+ buffer = ReadBuffer(relation, lastblock);
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
pageHeader = (Page) BufferGetPage(buffer);
+ if (len <= PageGetFreeSpace(pageHeader))
+ return buffer;
+ /*
+ * Doesn't fit, so we'll have to try someplace else.
+ */
+ LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+ /* buffer release will happen below... */
}
/*
- * Is there room on the last existing page?
+ * Before extending relation, make sure no one else has done
+ * so more recently than our last rd_nblocks update. (If we
+ * blindly extend the relation here, then probably most of the
+ * page the other guy added will end up going to waste.)
+ *
+ * We have to use a lock to ensure no one else is extending the
+ * rel at the same time, else we will both try to initialize the
+ * same new page.
*/
- if (len > PageGetFreeSpace(pageHeader))
+ if (!relation->rd_myxactonly)
+ LockPage(relation, 0, ExclusiveLock);
+
+ oldnblocks = relation->rd_nblocks;
+ /*
+ * XXX This does an lseek - rather expensive - but at the moment it is
+ * the only way to accurately determine how many blocks are in a
+ * relation. Is it worth keeping an accurate file length in shared
+ * memory someplace, rather than relying on the kernel to do it for us?
+ */
+ relation->rd_nblocks = RelationGetNumberOfBlocks(relation);
+
+ if (relation->rd_nblocks > oldnblocks)
{
- LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
- buffer = ReleaseAndReadBuffer(buffer, relation, P_NEW);
+ /*
+ * Someone else has indeed extended the relation recently.
+ * Try to fit our tuple into the new last page.
+ */
+ lastblock = relation->rd_nblocks - 1;
+ buffer = ReleaseAndReadBuffer(buffer, relation, lastblock, false);
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
pageHeader = (Page) BufferGetPage(buffer);
- Assert(PageIsNew((PageHeader) pageHeader));
- PageInit(pageHeader, BufferGetPageSize(buffer), 0);
-
- if (len > PageGetFreeSpace(pageHeader))
+ if (len <= PageGetFreeSpace(pageHeader))
{
- /* We should not get here given the test at the top */
- elog(STOP, "Tuple is too big: size %lu",
- (unsigned long) len);
+ /* OK, we don't need to extend again. */
+ if (!relation->rd_myxactonly)
+ UnlockPage(relation, 0, ExclusiveLock);
+ return buffer;
}
+ /*
+ * Doesn't fit, so we'll have to extend the relation (again).
+ */
+ LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+ /* buffer release will happen below... */
}
+ /*
+ * Extend the relation by one page and update rd_nblocks for next time.
+ */
+ lastblock = relation->rd_nblocks;
+ buffer = ReleaseAndReadBuffer(buffer, relation, lastblock, true);
+ relation->rd_nblocks = lastblock + 1;
+
+ /*
+ * We need to initialize the empty new page.
+ */
+ LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+ pageHeader = (Page) BufferGetPage(buffer);
+ Assert(PageIsNew((PageHeader) pageHeader));
+ PageInit(pageHeader, BufferGetPageSize(buffer), 0);
+
+ /*
+ * Release the file-extension lock; it's now OK for someone else
+ * to extend the relation some more.
+ */
if (!relation->rd_myxactonly)
UnlockPage(relation, 0, ExclusiveLock);
- return (buffer);
+ if (len > PageGetFreeSpace(pageHeader))
+ {
+ /* We should not get here given the test at the top */
+ elog(STOP, "Tuple is too big: size %lu",
+ (unsigned long) len);
+ }
+ return buffer;
}
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/storage/buffer/bufmgr.c,v 1.110 2001/05/10 20:38:49 tgl Exp $
+ * $Header: /cvsroot/pgsql/src/backend/storage/buffer/bufmgr.c,v 1.111 2001/05/12 19:58:27 tgl Exp $
*
*-------------------------------------------------------------------------
*/
*/
#define BUFFER_IS_BROKEN(buf) ((buf->flags & BM_IO_ERROR) && !(buf->flags & BM_DIRTY))
-static Buffer ReadBufferWithBufferLock(Relation relation, BlockNumber blockNum,
- bool bufferLockHeld);
+static Buffer ReadBufferInternal(Relation reln, BlockNumber blockNum,
+ bool isExtend, bool bufferLockHeld);
static BufferDesc *BufferAlloc(Relation reln, BlockNumber blockNum,
- bool *foundPtr, bool bufferLockHeld);
+ bool *foundPtr);
static int ReleaseBufferWithBufferLock(Buffer buffer);
static int BufferReplace(BufferDesc *bufHdr);
void PrintBufferDescs(void);
SpinRelease(BufMgrLock);
return buffer;
}
- return ReadBufferWithBufferLock(relation, blockNumber, true);
+ return ReadBufferInternal(relation, blockNumber, false, true);
}
else
{
return buffer;
}
}
- return ReadBuffer(relation, blockNumber);
+ return ReadBufferInternal(relation, blockNumber, false, false);
}
/*
/*
* ReadBuffer
- *
*/
Buffer
ReadBuffer(Relation reln, BlockNumber blockNum)
{
- return ReadBufferWithBufferLock(reln, blockNum, false);
+ return ReadBufferInternal(reln, blockNum, false, false);
}
/*
- * ReadBufferWithBufferLock -- does the work of
- * ReadBuffer() but with the possibility that
- * the buffer lock has already been held. this
- * is yet another effort to reduce the number of
- * semops in the system.
+ * ReadBufferInternal -- internal version of ReadBuffer with more options
+ *
+ * isExtend: if true, assume that we are extending the file and the caller
+ * is passing the current EOF block number (ie, caller already called
+ * smgrnblocks()).
+ *
+ * bufferLockHeld: if true, caller already acquired the bufmgr spinlock.
+ * (This is assumed never to be true if dealing with a local buffer!)
*/
static Buffer
-ReadBufferWithBufferLock(Relation reln,
- BlockNumber blockNum,
- bool bufferLockHeld)
+ReadBufferInternal(Relation reln, BlockNumber blockNum,
+ bool isExtend, bool bufferLockHeld)
{
BufferDesc *bufHdr;
int status;
bool found;
- bool extend; /* extending the file by one block */
bool isLocalBuf;
- extend = (blockNum == P_NEW);
isLocalBuf = reln->rd_myxactonly;
if (isLocalBuf)
{
ReadLocalBufferCount++;
+ /* Substitute proper block number if caller asked for P_NEW */
+ if (blockNum == P_NEW)
+ {
+ blockNum = reln->rd_nblocks;
+ reln->rd_nblocks++;
+ isExtend = true;
+ }
bufHdr = LocalBufferAlloc(reln, blockNum, &found);
if (found)
LocalBufferHitCount++;
else
{
ReadBufferCount++;
-
+ /* Substitute proper block number if caller asked for P_NEW */
+ if (blockNum == P_NEW)
+ {
+ blockNum = smgrnblocks(DEFAULT_SMGR, reln);
+ isExtend = true;
+ }
/*
* lookup the buffer. IO_IN_PROGRESS is set if the requested
* block is not currently in memory.
*/
- bufHdr = BufferAlloc(reln, blockNum, &found, bufferLockHeld);
+ if (!bufferLockHeld)
+ SpinAcquire(BufMgrLock);
+ bufHdr = BufferAlloc(reln, blockNum, &found);
if (found)
BufferHitCount++;
}
+ /* At this point we do NOT hold the bufmgr spinlock. */
+
if (!bufHdr)
return InvalidBuffer;
if (found)
{
/*
- * Could see found && extend if a buffer was already created for
+ * Could have found && isExtend if a buffer was already created for
* the next page position, but then smgrextend failed to write
* the page. Must fall through and try to extend file again.
*/
- if (!extend)
+ if (!isExtend)
return BufferDescriptorGetBuffer(bufHdr);
}
* if we have gotten to this point, the reln pointer must be ok and
* the relation file must be open.
*/
- if (extend)
+ if (isExtend)
{
/* new buffers are zero-filled */
MemSet((char *) MAKE_PTR(bufHdr->data), 0, BLCKSZ);
- status = smgrextend(DEFAULT_SMGR, reln, bufHdr->tag.blockNum,
+ status = smgrextend(DEFAULT_SMGR, reln, blockNum,
(char *) MAKE_PTR(bufHdr->data));
}
else
{
- status = smgrread(DEFAULT_SMGR, reln, bufHdr->tag.blockNum,
+ status = smgrread(DEFAULT_SMGR, reln, blockNum,
(char *) MAKE_PTR(bufHdr->data));
}
*
* Returns: descriptor for buffer
*
- * When this routine returns, the BufMgrLock is guaranteed NOT be held.
+ * BufMgrLock must be held at entry. When this routine returns,
+ * the BufMgrLock is guaranteed NOT to be held.
*/
static BufferDesc *
BufferAlloc(Relation reln,
BlockNumber blockNum,
- bool *foundPtr,
- bool bufferLockHeld)
+ bool *foundPtr)
{
BufferDesc *buf,
*buf2;
/* create a new tag so we can lookup the buffer */
/* assume that the relation is already open */
- if (blockNum == P_NEW)
- {
- blockNum = smgrnblocks(DEFAULT_SMGR, reln);
- }
-
INIT_BUFFERTAG(&newTag, reln, blockNum);
- if (!bufferLockHeld)
- SpinAcquire(BufMgrLock);
-
/* see if the block is in the buffer pool already */
buf = BufTableLookup(&newTag);
if (buf != NULL)
#undef ReleaseAndReadBuffer
/*
* ReleaseAndReadBuffer -- combine ReleaseBuffer() and ReadBuffer()
- * so that only one semop needs to be called.
+ * to save a spinlock release/acquire.
*
+ * An additional frammish of this routine is that the caller may perform
+ * file extension (as if blockNum = P_NEW) by passing the actual current
+ * EOF block number as blockNum and setting isExtend true. This hack
+ * allows us to avoid calling smgrnblocks() again when the caller has
+ * already done it.
+ *
+ * Note: it is OK to pass buffer = InvalidBuffer, indicating that no old
+ * buffer actually needs to be released. This case is the same as ReadBuffer
+ * except for the isExtend option.
*/
Buffer
ReleaseAndReadBuffer(Buffer buffer,
Relation relation,
- BlockNumber blockNum)
+ BlockNumber blockNum,
+ bool isExtend)
{
BufferDesc *bufHdr;
- Buffer retbuf;
- if (BufferIsLocal(buffer))
- {
- Assert(LocalRefCount[-buffer - 1] > 0);
- LocalRefCount[-buffer - 1]--;
- }
- else
+ if (BufferIsValid(buffer))
{
- if (BufferIsValid(buffer))
+ if (BufferIsLocal(buffer))
+ {
+ Assert(LocalRefCount[-buffer - 1] > 0);
+ LocalRefCount[-buffer - 1]--;
+ }
+ else
{
bufHdr = &BufferDescriptors[buffer - 1];
Assert(PrivateRefCount[buffer - 1] > 0);
AddBufferToFreelist(bufHdr);
bufHdr->flags |= BM_FREE;
}
- retbuf = ReadBufferWithBufferLock(relation, blockNum, true);
- return retbuf;
+ return ReadBufferInternal(relation, blockNum,
+ isExtend, true);
}
}
}
- return ReadBuffer(relation, blockNum);
+ return ReadBufferInternal(relation, blockNum,
+ isExtend, false);
}
/*
int line,
Buffer buffer,
Relation relation,
- BlockNumber blockNum)
+ BlockNumber blockNum,
+ bool isExtend)
{
bool bufferValid;
Buffer b;
bufferValid = BufferIsValid(buffer);
- b = ReleaseAndReadBuffer(buffer, relation, blockNum);
+ b = ReleaseAndReadBuffer(buffer, relation, blockNum, isExtend);
if (ShowPinTrace && bufferValid && BufferIsLocal(buffer)
&& is_userbuffer(buffer))
{
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/storage/buffer/localbuf.c,v 1.40 2001/03/22 03:59:44 momjian Exp $
+ * $Header: /cvsroot/pgsql/src/backend/storage/buffer/localbuf.c,v 1.41 2001/05/12 19:58:27 tgl Exp $
*
*-------------------------------------------------------------------------
*/
int i;
BufferDesc *bufHdr = (BufferDesc *) NULL;
- if (blockNum == P_NEW)
- {
- blockNum = reln->rd_nblocks;
- reln->rd_nblocks++;
- }
-
/* a low tech search for now -- not optimized for scans */
for (i = 0; i < NLocBuffer; i++)
{
* Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $Id: bufmgr.h,v 1.50 2001/03/22 04:01:05 momjian Exp $
+ * $Id: bufmgr.h,v 1.51 2001/05/12 19:58:28 tgl Exp $
*
*-------------------------------------------------------------------------
*/
extern int WriteBuffer(Buffer buffer);
extern int WriteNoReleaseBuffer(Buffer buffer);
extern Buffer ReleaseAndReadBuffer(Buffer buffer, Relation relation,
- BlockNumber blockNum);
+ BlockNumber blockNum, bool isExtend);
extern int FlushBuffer(Buffer buffer, bool sync, bool release);
extern void InitBufferPool(void);