]> granicus.if.org Git - postgresql/commitdiff
Modify RelationGetBufferForTuple() so that we only do lseek and lock
authorTom Lane <tgl@sss.pgh.pa.us>
Sat, 12 May 2001 19:58:28 +0000 (19:58 +0000)
committerTom Lane <tgl@sss.pgh.pa.us>
Sat, 12 May 2001 19:58:28 +0000 (19:58 +0000)
when we need to move to a new page; as long as we can insert the new
tuple on the same page as before, we only need LockBuffer and not the
expensive stuff.  Also, twiddle bufmgr interfaces to avoid redundant
lseeks in RelationGetBufferForTuple and BufferAlloc.  Successive inserts
now require one lseek per page added, rather than one per tuple with
several additional ones at each page boundary as happened before.
Lock contention when multiple backends are inserting in same table
is also greatly reduced.

src/backend/access/heap/heapam.c
src/backend/access/heap/hio.c
src/backend/storage/buffer/bufmgr.c
src/backend/storage/buffer/localbuf.c
src/include/storage/bufmgr.h

index 49ec63658f2c7cd243ddb0a092334147510732a8..7e9b8970204752bc89a9578b5b15f9170f7b37b5 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.113 2001/03/25 23:23:58 tgl Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.114 2001/05/12 19:58:27 tgl Exp $
  *
  *
  * INTERFACE ROUTINES
@@ -487,7 +487,7 @@ heapgettup(Relation relation,
                        return;
                }
 
-               *buffer = ReleaseAndReadBuffer(*buffer, relation, page);
+               *buffer = ReleaseAndReadBuffer(*buffer, relation, page, false);
 
                if (!BufferIsValid(*buffer))
                        elog(ERROR, "heapgettup: failed ReadBuffer");
index b4780c208e04df227bbbdde4d0afd375c16bb7e1..7278ecd2deaba2ed027da01fd3e71d54db7dc027 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *       $Id: hio.c,v 1.37 2001/03/22 06:16:07 momjian Exp $
+ *       $Id: hio.c,v 1.38 2001/05/12 19:58:27 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -66,7 +66,7 @@ RelationPutHeapTuple(Relation relation,
 /*
  * RelationGetBufferForTuple
  *
- * Returns (locked) buffer with free space >= given len.
+ * Returns exclusive-locked buffer with free space >= given len.
  *
  * Note that we use LockPage to lock relation for extension. We can
  * do this as long as in all other places we use page-level locking
@@ -75,14 +75,14 @@ RelationPutHeapTuple(Relation relation,
  *
  * ELOG(ERROR) is allowed here, so this routine *must* be called
  * before any (unlogged) changes are made in buffer pool.
- *
  */
 Buffer
 RelationGetBufferForTuple(Relation relation, Size len)
 {
-       Buffer          buffer;
+       Buffer          buffer = InvalidBuffer;
        Page            pageHeader;
-       BlockNumber lastblock;
+       BlockNumber lastblock,
+                               oldnblocks;
 
        len = MAXALIGN(len);            /* be conservative */
 
@@ -93,59 +93,102 @@ RelationGetBufferForTuple(Relation relation, Size len)
                elog(ERROR, "Tuple is too big: size %lu, max size %ld",
                         (unsigned long) len, MaxTupleSize);
 
-       if (!relation->rd_myxactonly)
-               LockPage(relation, 0, ExclusiveLock);
-
        /*
-        * XXX This does an lseek - VERY expensive - but at the moment it is
-        * the only way to accurately determine how many blocks are in a
-        * relation.  A good optimization would be to get this to actually
-        * work properly.
+        * First, use relcache's record of table length to guess where the
+        * last page is, and try to put the tuple there.  This cached value
+        * may be out of date, in which case we'll be inserting into a non-last
+        * page, but that should be OK.  Note that in a newly created relcache
+        * entry, rd_nblocks may be zero; if so, we'll set it correctly below.
         */
-       lastblock = RelationGetNumberOfBlocks(relation);
-
-       /*
-        * Get the last existing page --- may need to create the first one if
-        * this is a virgin relation.
-        */
-       if (lastblock == 0)
-       {
-               buffer = ReadBuffer(relation, P_NEW);
-               LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
-               pageHeader = (Page) BufferGetPage(buffer);
-               Assert(PageIsNew((PageHeader) pageHeader));
-               PageInit(pageHeader, BufferGetPageSize(buffer), 0);
-       }
-       else
+       if (relation->rd_nblocks > 0)
        {
-               buffer = ReadBuffer(relation, lastblock - 1);
+               lastblock = relation->rd_nblocks - 1;
+               buffer = ReadBuffer(relation, lastblock);
                LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
                pageHeader = (Page) BufferGetPage(buffer);
+               if (len <= PageGetFreeSpace(pageHeader))
+                       return buffer;
+               /*
+                * Doesn't fit, so we'll have to try someplace else.
+                */
+               LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+               /* buffer release will happen below... */
        }
 
        /*
-        * Is there room on the last existing page?
+        * Before extending relation, make sure no one else has done
+        * so more recently than our last rd_nblocks update.  (If we
+        * blindly extend the relation here, then probably most of the
+        * page the other guy added will end up going to waste.)
+        *
+        * We have to use a lock to ensure no one else is extending the
+        * rel at the same time, else we will both try to initialize the
+        * same new page.
         */
-       if (len > PageGetFreeSpace(pageHeader))
+       if (!relation->rd_myxactonly)
+               LockPage(relation, 0, ExclusiveLock);
+
+       oldnblocks = relation->rd_nblocks;
+       /*
+        * XXX This does an lseek - rather expensive - but at the moment it is
+        * the only way to accurately determine how many blocks are in a
+        * relation.  Is it worth keeping an accurate file length in shared
+        * memory someplace, rather than relying on the kernel to do it for us?
+        */
+       relation->rd_nblocks = RelationGetNumberOfBlocks(relation);
+
+       if (relation->rd_nblocks > oldnblocks)
        {
-               LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
-               buffer = ReleaseAndReadBuffer(buffer, relation, P_NEW);
+               /*
+                * Someone else has indeed extended the relation recently.
+                * Try to fit our tuple into the new last page.
+                */
+               lastblock = relation->rd_nblocks - 1;
+               buffer = ReleaseAndReadBuffer(buffer, relation, lastblock, false);
                LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
                pageHeader = (Page) BufferGetPage(buffer);
-               Assert(PageIsNew((PageHeader) pageHeader));
-               PageInit(pageHeader, BufferGetPageSize(buffer), 0);
-
-               if (len > PageGetFreeSpace(pageHeader))
+               if (len <= PageGetFreeSpace(pageHeader))
                {
-                       /* We should not get here given the test at the top */
-                       elog(STOP, "Tuple is too big: size %lu",
-                                (unsigned long) len);
+                       /* OK, we don't need to extend again. */
+                       if (!relation->rd_myxactonly)
+                               UnlockPage(relation, 0, ExclusiveLock);
+                       return buffer;
                }
+               /*
+                * Doesn't fit, so we'll have to extend the relation (again).
+                */
+               LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+               /* buffer release will happen below... */
        }
 
+       /*
+        * Extend the relation by one page and update rd_nblocks for next time.
+        */
+       lastblock = relation->rd_nblocks;
+       buffer = ReleaseAndReadBuffer(buffer, relation, lastblock, true);
+       relation->rd_nblocks = lastblock + 1;
+
+       /*
+        * We need to initialize the empty new page.
+        */
+       LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+       pageHeader = (Page) BufferGetPage(buffer);
+       Assert(PageIsNew((PageHeader) pageHeader));
+       PageInit(pageHeader, BufferGetPageSize(buffer), 0);
+
+       /*
+        * Release the file-extension lock; it's now OK for someone else
+        * to extend the relation some more.
+        */
        if (!relation->rd_myxactonly)
                UnlockPage(relation, 0, ExclusiveLock);
 
-       return (buffer);
+       if (len > PageGetFreeSpace(pageHeader))
+       {
+               /* We should not get here given the test at the top */
+               elog(STOP, "Tuple is too big: size %lu",
+                        (unsigned long) len);
+       }
 
+       return buffer;
 }
index b8d3e51b5d538947817ea36d3e41d9d5bbc55b85..d5ff24df0227f19a0a954c71280bb795bd4c3efb 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/storage/buffer/bufmgr.c,v 1.110 2001/05/10 20:38:49 tgl Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/storage/buffer/bufmgr.c,v 1.111 2001/05/12 19:58:27 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -88,10 +88,10 @@ extern void AbortBufferIO(void);
 */
 #define BUFFER_IS_BROKEN(buf) ((buf->flags & BM_IO_ERROR) && !(buf->flags & BM_DIRTY))
 
-static Buffer ReadBufferWithBufferLock(Relation relation, BlockNumber blockNum,
-                                                bool bufferLockHeld);
+static Buffer ReadBufferInternal(Relation reln, BlockNumber blockNum,
+                                                                bool isExtend, bool bufferLockHeld);
 static BufferDesc *BufferAlloc(Relation reln, BlockNumber blockNum,
-                       bool *foundPtr, bool bufferLockHeld);
+                                                          bool *foundPtr);
 static int     ReleaseBufferWithBufferLock(Buffer buffer);
 static int     BufferReplace(BufferDesc *bufHdr);
 void           PrintBufferDescs(void);
@@ -121,7 +121,7 @@ RelationGetBufferWithBuffer(Relation relation,
                                SpinRelease(BufMgrLock);
                                return buffer;
                        }
-                       return ReadBufferWithBufferLock(relation, blockNumber, true);
+                       return ReadBufferInternal(relation, blockNumber, false, true);
                }
                else
                {
@@ -131,7 +131,7 @@ RelationGetBufferWithBuffer(Relation relation,
                                return buffer;
                }
        }
-       return ReadBuffer(relation, blockNumber);
+       return ReadBufferInternal(relation, blockNumber, false, false);
 }
 
 /*
@@ -152,38 +152,44 @@ RelationGetBufferWithBuffer(Relation relation,
 
 /*
  * ReadBuffer
- *
  */
 Buffer
 ReadBuffer(Relation reln, BlockNumber blockNum)
 {
-       return ReadBufferWithBufferLock(reln, blockNum, false);
+       return ReadBufferInternal(reln, blockNum, false, false);
 }
 
 /*
- * ReadBufferWithBufferLock -- does the work of
- *             ReadBuffer() but with the possibility that
- *             the buffer lock has already been held. this
- *             is yet another effort to reduce the number of
- *             semops in the system.
+ * ReadBufferInternal -- internal version of ReadBuffer with more options
+ *
+ * isExtend: if true, assume that we are extending the file and the caller
+ * is passing the current EOF block number (ie, caller already called
+ * smgrnblocks()).
+ *
+ * bufferLockHeld: if true, caller already acquired the bufmgr spinlock.
+ * (This is assumed never to be true if dealing with a local buffer!)
  */
 static Buffer
-ReadBufferWithBufferLock(Relation reln,
-                                                BlockNumber blockNum,
-                                                bool bufferLockHeld)
+ReadBufferInternal(Relation reln, BlockNumber blockNum,
+                                  bool isExtend, bool bufferLockHeld)
 {
        BufferDesc *bufHdr;
        int                     status;
        bool            found;
-       bool            extend;                 /* extending the file by one block */
        bool            isLocalBuf;
 
-       extend = (blockNum == P_NEW);
        isLocalBuf = reln->rd_myxactonly;
 
        if (isLocalBuf)
        {
                ReadLocalBufferCount++;
+               /* Substitute proper block number if caller asked for P_NEW */
+               if (blockNum == P_NEW)
+               {
+                       blockNum = reln->rd_nblocks;
+                       reln->rd_nblocks++;
+                       isExtend = true;
+               }
                bufHdr = LocalBufferAlloc(reln, blockNum, &found);
                if (found)
                        LocalBufferHitCount++;
@@ -191,16 +197,25 @@ ReadBufferWithBufferLock(Relation reln,
        else
        {
                ReadBufferCount++;
-
+               /* Substitute proper block number if caller asked for P_NEW */
+               if (blockNum == P_NEW)
+               {
+                       blockNum = smgrnblocks(DEFAULT_SMGR, reln);
+                       isExtend = true;
+               }
                /*
                 * lookup the buffer.  IO_IN_PROGRESS is set if the requested
                 * block is not currently in memory.
                 */
-               bufHdr = BufferAlloc(reln, blockNum, &found, bufferLockHeld);
+               if (!bufferLockHeld)
+                       SpinAcquire(BufMgrLock);
+               bufHdr = BufferAlloc(reln, blockNum, &found);
                if (found)
                        BufferHitCount++;
        }
 
+       /* At this point we do NOT hold the bufmgr spinlock. */
+
        if (!bufHdr)
                return InvalidBuffer;
 
@@ -208,11 +223,11 @@ ReadBufferWithBufferLock(Relation reln,
        if (found)
        {
                /*
-                * Could see found && extend if a buffer was already created for
+                * Could have found && isExtend if a buffer was already created for
                 * the next page position, but then smgrextend failed to write
                 * the page.  Must fall through and try to extend file again.
                 */
-               if (!extend)
+               if (!isExtend)
                        return BufferDescriptorGetBuffer(bufHdr);
        }
 
@@ -220,16 +235,16 @@ ReadBufferWithBufferLock(Relation reln,
         * if we have gotten to this point, the reln pointer must be ok and
         * the relation file must be open.
         */
-       if (extend)
+       if (isExtend)
        {
                /* new buffers are zero-filled */
                MemSet((char *) MAKE_PTR(bufHdr->data), 0, BLCKSZ);
-               status = smgrextend(DEFAULT_SMGR, reln, bufHdr->tag.blockNum,
+               status = smgrextend(DEFAULT_SMGR, reln, blockNum,
                                                        (char *) MAKE_PTR(bufHdr->data));
        }
        else
        {
-               status = smgrread(DEFAULT_SMGR, reln, bufHdr->tag.blockNum,
+               status = smgrread(DEFAULT_SMGR, reln, blockNum,
                                                  (char *) MAKE_PTR(bufHdr->data));
        }
 
@@ -290,13 +305,13 @@ ReadBufferWithBufferLock(Relation reln,
  *
  * Returns: descriptor for buffer
  *
- * When this routine returns, the BufMgrLock is guaranteed NOT be held.
+ * BufMgrLock must be held at entry.  When this routine returns,
+ * the BufMgrLock is guaranteed NOT to be held.
  */
 static BufferDesc *
 BufferAlloc(Relation reln,
                        BlockNumber blockNum,
-                       bool *foundPtr,
-                       bool bufferLockHeld)
+                       bool *foundPtr)
 {
        BufferDesc *buf,
                           *buf2;
@@ -305,16 +320,8 @@ BufferAlloc(Relation reln,
 
        /* create a new tag so we can lookup the buffer */
        /* assume that the relation is already open */
-       if (blockNum == P_NEW)
-       {
-               blockNum = smgrnblocks(DEFAULT_SMGR, reln);
-       }
-
        INIT_BUFFERTAG(&newTag, reln, blockNum);
 
-       if (!bufferLockHeld)
-               SpinAcquire(BufMgrLock);
-
        /* see if the block is in the buffer pool already */
        buf = BufTableLookup(&newTag);
        if (buf != NULL)
@@ -666,25 +673,34 @@ WriteNoReleaseBuffer(Buffer buffer)
 #undef ReleaseAndReadBuffer
 /*
  * ReleaseAndReadBuffer -- combine ReleaseBuffer() and ReadBuffer()
- *             so that only one semop needs to be called.
+ *             to save a spinlock release/acquire.
  *
+ * An additional frammish of this routine is that the caller may perform
+ * file extension (as if blockNum = P_NEW) by passing the actual current
+ * EOF block number as blockNum and setting isExtend true.  This hack
+ * allows us to avoid calling smgrnblocks() again when the caller has
+ * already done it.
+ *
+ * Note: it is OK to pass buffer = InvalidBuffer, indicating that no old
+ * buffer actually needs to be released.  This case is the same as ReadBuffer
+ * except for the isExtend option.
  */
 Buffer
 ReleaseAndReadBuffer(Buffer buffer,
                                         Relation relation,
-                                        BlockNumber blockNum)
+                                        BlockNumber blockNum,
+                                        bool isExtend)
 {
        BufferDesc *bufHdr;
-       Buffer          retbuf;
 
-       if (BufferIsLocal(buffer))
-       {
-               Assert(LocalRefCount[-buffer - 1] > 0);
-               LocalRefCount[-buffer - 1]--;
-       }
-       else
+       if (BufferIsValid(buffer))
        {
-               if (BufferIsValid(buffer))
+               if (BufferIsLocal(buffer))
+               {
+                       Assert(LocalRefCount[-buffer - 1] > 0);
+                       LocalRefCount[-buffer - 1]--;
+               }
+               else
                {
                        bufHdr = &BufferDescriptors[buffer - 1];
                        Assert(PrivateRefCount[buffer - 1] > 0);
@@ -701,13 +717,14 @@ ReleaseAndReadBuffer(Buffer buffer,
                                        AddBufferToFreelist(bufHdr);
                                        bufHdr->flags |= BM_FREE;
                                }
-                               retbuf = ReadBufferWithBufferLock(relation, blockNum, true);
-                               return retbuf;
+                               return ReadBufferInternal(relation, blockNum,
+                                                                                 isExtend, true);
                        }
                }
        }
 
-       return ReadBuffer(relation, blockNum);
+       return ReadBufferInternal(relation, blockNum,
+                                                         isExtend, false);
 }
 
 /*
@@ -1735,13 +1752,14 @@ ReleaseAndReadBuffer_Debug(char *file,
                                                   int line,
                                                   Buffer buffer,
                                                   Relation relation,
-                                                  BlockNumber blockNum)
+                                                  BlockNumber blockNum,
+                                                  bool isExtend)
 {
        bool            bufferValid;
        Buffer          b;
 
        bufferValid = BufferIsValid(buffer);
-       b = ReleaseAndReadBuffer(buffer, relation, blockNum);
+       b = ReleaseAndReadBuffer(buffer, relation, blockNum, isExtend);
        if (ShowPinTrace && bufferValid && BufferIsLocal(buffer)
                && is_userbuffer(buffer))
        {
index 6e3cd7564113cde6139b62f7cbc14cdd56df514d..4b4a31066b2cc7df4118b8ce9c5282c54e9df7a8 100644 (file)
@@ -16,7 +16,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/storage/buffer/localbuf.c,v 1.40 2001/03/22 03:59:44 momjian Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/storage/buffer/localbuf.c,v 1.41 2001/05/12 19:58:27 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -54,12 +54,6 @@ LocalBufferAlloc(Relation reln, BlockNumber blockNum, bool *foundPtr)
        int                     i;
        BufferDesc *bufHdr = (BufferDesc *) NULL;
 
-       if (blockNum == P_NEW)
-       {
-               blockNum = reln->rd_nblocks;
-               reln->rd_nblocks++;
-       }
-
        /* a low tech search for now -- not optimized for scans */
        for (i = 0; i < NLocBuffer; i++)
        {
index fc76f9fe4c5bd27e9124414675d2e7f33af55128..d45c8888c1d0ebacf3a120762b64591e9dac9ee9 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: bufmgr.h,v 1.50 2001/03/22 04:01:05 momjian Exp $
+ * $Id: bufmgr.h,v 1.51 2001/05/12 19:58:28 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -164,7 +164,7 @@ extern Buffer ReadBuffer(Relation reln, BlockNumber blockNum);
 extern int     WriteBuffer(Buffer buffer);
 extern int     WriteNoReleaseBuffer(Buffer buffer);
 extern Buffer ReleaseAndReadBuffer(Buffer buffer, Relation relation,
-                                        BlockNumber blockNum);
+                                                                  BlockNumber blockNum, bool isExtend);
 extern int     FlushBuffer(Buffer buffer, bool sync, bool release);
 
 extern void InitBufferPool(void);