]> granicus.if.org Git - postgresql/blobdiff - src/backend/access/heap/hio.c
Modify BufferGetPage() to prepare for "snapshot too old" feature
[postgresql] / src / backend / access / heap / hio.c
index 6db73bf9d00774c909970a72992fc4262aa49ed7..a041ca0c756d1dc0816a5e1a98d7a996a10ad1c8 100644 (file)
@@ -3,7 +3,7 @@
  * hio.c
  *       POSTGRES heap access method input/output code.
  *
- * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  *
@@ -48,7 +48,7 @@ RelationPutHeapTuple(Relation relation,
        Assert(!token || HeapTupleHeaderIsSpeculative(tuple->t_data));
 
        /* Add the tuple to the page */
-       pageHeader = BufferGetPage(buffer);
+       pageHeader = BufferGetPage(buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
 
        offnum = PageAddItem(pageHeader, (Item) tuple->t_data,
                                                 tuple->t_len, InvalidOffsetNumber, false, true);
@@ -132,10 +132,13 @@ GetVisibilityMapPins(Relation relation, Buffer buffer1, Buffer buffer2,
        while (1)
        {
                /* Figure out which pins we need but don't have. */
-               need_to_pin_buffer1 = PageIsAllVisible(BufferGetPage(buffer1))
+               need_to_pin_buffer1 =
+                       PageIsAllVisible(BufferGetPage(buffer1, NULL, NULL,
+                                                                                  BGP_NO_SNAPSHOT_TEST))
                        && !visibilitymap_pin_ok(block1, *vmbuffer1);
                need_to_pin_buffer2 = buffer2 != InvalidBuffer
-                       && PageIsAllVisible(BufferGetPage(buffer2))
+                       && PageIsAllVisible(BufferGetPage(buffer2, NULL, NULL,
+                                                                                         BGP_NO_SNAPSHOT_TEST))
                        && !visibilitymap_pin_ok(block2, *vmbuffer2);
                if (!need_to_pin_buffer1 && !need_to_pin_buffer2)
                        return;
@@ -168,6 +171,75 @@ GetVisibilityMapPins(Relation relation, Buffer buffer1, Buffer buffer2,
        }
 }
 
+/*
+ * Extend a relation by multiple blocks to avoid future contention on the
+ * relation extension lock.  Our goal is to pre-extend the relation by an
+ * amount which ramps up as the degree of contention ramps up, but limiting
+ * the result to some sane overall value.
+ */
+static void
+RelationAddExtraBlocks(Relation relation, BulkInsertState bistate)
+{
+       Page            page;
+       BlockNumber     blockNum = InvalidBlockNumber,
+                               firstBlock = InvalidBlockNumber;
+       int                     extraBlocks = 0;
+       int                     lockWaiters = 0;
+       Size            freespace = 0;
+       Buffer          buffer;
+
+       /* Use the length of the lock wait queue to judge how much to extend. */
+       lockWaiters = RelationExtensionLockWaiterCount(relation);
+       if (lockWaiters <= 0)
+               return;
+
+       /*
+        * It might seem like multiplying the number of lock waiters by as much
+        * as 20 is too aggressive, but benchmarking revealed that smaller numbers
+        * were insufficient.  512 is just an arbitrary cap to prevent pathological
+        * results.
+        */
+       extraBlocks = Min(512, lockWaiters * 20);
+
+       while (extraBlocks-- >= 0)
+       {
+               /* Ouch - an unnecessary lseek() each time through the loop! */
+               buffer = ReadBufferBI(relation, P_NEW, bistate);
+
+               /* Extend by one page. */
+               LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+               page = BufferGetPage(buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+               PageInit(page, BufferGetPageSize(buffer), 0);
+               MarkBufferDirty(buffer);
+               blockNum = BufferGetBlockNumber(buffer);
+               freespace = PageGetHeapFreeSpace(page);
+               UnlockReleaseBuffer(buffer);
+
+               /* Remember first block number thus added. */
+               if (firstBlock == InvalidBlockNumber)
+                       firstBlock = blockNum;
+
+               /*
+                * Immediately update the bottom level of the FSM.  This has a good
+                * chance of making this page visible to other concurrently inserting
+                * backends, and we want that to happen without delay.
+                */
+               RecordPageWithFreeSpace(relation, blockNum, freespace);
+       }
+
+       /*
+        * Updating the upper levels of the free space map is too expensive
+        * to do for every block, but it's worth doing once at the end to make
+        * sure that subsequent insertion activity sees all of those nifty free
+        * pages we just inserted.
+        *
+        * Note that we're using the freespace value that was reported for the
+        * last block we added as if it were the freespace value for every block
+        * we added.  That's actually true, because they're all equally empty.
+        */
+       UpdateFreeSpaceMap(relation, firstBlock, blockNum, freespace);
+}
+
 /*
  * RelationGetBufferForTuple
  *
@@ -233,8 +305,8 @@ RelationGetBufferForTuple(Relation relation, Size len,
        bool            use_fsm = !(options & HEAP_INSERT_SKIP_FSM);
        Buffer          buffer = InvalidBuffer;
        Page            page;
-       Size            pageFreeSpace,
-                               saveFreeSpace;
+       Size            pageFreeSpace = 0,
+                               saveFreeSpace = 0;
        BlockNumber targetBlock,
                                otherBlock;
        bool            needLock;
@@ -308,6 +380,7 @@ RelationGetBufferForTuple(Relation relation, Size len,
                }
        }
 
+loop:
        while (targetBlock != InvalidBlockNumber)
        {
                /*
@@ -327,7 +400,8 @@ RelationGetBufferForTuple(Relation relation, Size len,
                {
                        /* easy case */
                        buffer = ReadBufferBI(relation, targetBlock, bistate);
-                       if (PageIsAllVisible(BufferGetPage(buffer)))
+                       if (PageIsAllVisible(BufferGetPage(buffer, NULL, NULL,
+                                                                                          BGP_NO_SNAPSHOT_TEST)))
                                visibilitymap_pin(relation, targetBlock, vmbuffer);
                        LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
                }
@@ -335,7 +409,8 @@ RelationGetBufferForTuple(Relation relation, Size len,
                {
                        /* also easy case */
                        buffer = otherBuffer;
-                       if (PageIsAllVisible(BufferGetPage(buffer)))
+                       if (PageIsAllVisible(BufferGetPage(buffer, NULL, NULL,
+                                                                                          BGP_NO_SNAPSHOT_TEST)))
                                visibilitymap_pin(relation, targetBlock, vmbuffer);
                        LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
                }
@@ -343,7 +418,8 @@ RelationGetBufferForTuple(Relation relation, Size len,
                {
                        /* lock other buffer first */
                        buffer = ReadBuffer(relation, targetBlock);
-                       if (PageIsAllVisible(BufferGetPage(buffer)))
+                       if (PageIsAllVisible(BufferGetPage(buffer, NULL, NULL,
+                                                                                          BGP_NO_SNAPSHOT_TEST)))
                                visibilitymap_pin(relation, targetBlock, vmbuffer);
                        LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
                        LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
@@ -352,7 +428,8 @@ RelationGetBufferForTuple(Relation relation, Size len,
                {
                        /* lock target buffer first */
                        buffer = ReadBuffer(relation, targetBlock);
-                       if (PageIsAllVisible(BufferGetPage(buffer)))
+                       if (PageIsAllVisible(BufferGetPage(buffer, NULL, NULL,
+                                                                                          BGP_NO_SNAPSHOT_TEST)))
                                visibilitymap_pin(relation, targetBlock, vmbuffer);
                        LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
                        LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
@@ -392,7 +469,7 @@ RelationGetBufferForTuple(Relation relation, Size len,
                 * Now we can check to see if there's enough free space here. If so,
                 * we're done.
                 */
-               page = BufferGetPage(buffer);
+               page = BufferGetPage(buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
                pageFreeSpace = PageGetHeapFreeSpace(page);
                if (len + saveFreeSpace <= pageFreeSpace)
                {
@@ -440,10 +517,46 @@ RelationGetBufferForTuple(Relation relation, Size len,
         */
        needLock = !RELATION_IS_LOCAL(relation);
 
+       /*
+        * If we need the lock but are not able to acquire it immediately, we'll
+        * consider extending the relation by multiple blocks at a time to manage
+        * contention on the relation extension lock.  However, this only makes
+        * sense if we're using the FSM; otherwise, there's no point.
+        */
        if (needLock)
-               LockRelationForExtension(relation, ExclusiveLock);
+       {
+               if (!use_fsm)
+                       LockRelationForExtension(relation, ExclusiveLock);
+               else if (!ConditionalLockRelationForExtension(relation, ExclusiveLock))
+               {
+                       /* Couldn't get the lock immediately; wait for it. */
+                       LockRelationForExtension(relation, ExclusiveLock);
+
+                       /*
+                        * Check if some other backend has extended a block for us while
+                        * we were waiting on the lock.
+                        */
+                       targetBlock = GetPageWithFreeSpace(relation, len + saveFreeSpace);
+
+                       /*
+                        * If some other waiter has already extended the relation, we
+                        * don't need to do so; just use the existing freespace.
+                        */
+                       if (targetBlock != InvalidBlockNumber)
+                       {
+                               UnlockRelationForExtension(relation, ExclusiveLock);
+                               goto loop;
+                       }
+
+                       /* Time to bulk-extend. */
+                       RelationAddExtraBlocks(relation, bistate);
+               }
+       }
 
        /*
+        * In addition to whatever extension we performed above, we always add
+        * at least one block to satisfy our own request.
+        *
         * XXX This does an lseek - rather expensive - but at the moment it is the
         * only way to accurately determine how many blocks are in a relation.  Is
         * it worth keeping an accurate file length in shared memory someplace,
@@ -477,7 +590,7 @@ RelationGetBufferForTuple(Relation relation, Size len,
         * is empty (this should never happen, but if it does we don't want to
         * risk wiping out valid data).
         */
-       page = BufferGetPage(buffer);
+       page = BufferGetPage(buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
 
        if (!PageIsNew(page))
                elog(ERROR, "page %u of relation \"%s\" should be empty but is not",