* hio.c
* POSTGRES heap access method input/output code.
*
- * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
Assert(!token || HeapTupleHeaderIsSpeculative(tuple->t_data));
/* Add the tuple to the page */
- pageHeader = BufferGetPage(buffer);
+ pageHeader = BufferGetPage(buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
offnum = PageAddItem(pageHeader, (Item) tuple->t_data,
tuple->t_len, InvalidOffsetNumber, false, true);
while (1)
{
/* Figure out which pins we need but don't have. */
- need_to_pin_buffer1 = PageIsAllVisible(BufferGetPage(buffer1))
+ need_to_pin_buffer1 =
+ PageIsAllVisible(BufferGetPage(buffer1, NULL, NULL,
+ BGP_NO_SNAPSHOT_TEST))
&& !visibilitymap_pin_ok(block1, *vmbuffer1);
need_to_pin_buffer2 = buffer2 != InvalidBuffer
- && PageIsAllVisible(BufferGetPage(buffer2))
+ && PageIsAllVisible(BufferGetPage(buffer2, NULL, NULL,
+ BGP_NO_SNAPSHOT_TEST))
&& !visibilitymap_pin_ok(block2, *vmbuffer2);
if (!need_to_pin_buffer1 && !need_to_pin_buffer2)
return;
}
}
+/*
+ * Extend a relation by multiple blocks to avoid future contention on the
+ * relation extension lock. Our goal is to pre-extend the relation by an
+ * amount which ramps up as the degree of contention ramps up, but limiting
+ * the result to some sane overall value.
+ */
+static void
+RelationAddExtraBlocks(Relation relation, BulkInsertState bistate)
+{
+ Page page;
+ BlockNumber blockNum = InvalidBlockNumber,
+ firstBlock = InvalidBlockNumber;
+ int extraBlocks = 0;
+ int lockWaiters = 0;
+ Size freespace = 0;
+ Buffer buffer;
+
+ /* Use the length of the lock wait queue to judge how much to extend. */
+ lockWaiters = RelationExtensionLockWaiterCount(relation);
+ if (lockWaiters <= 0)
+ return;
+
+ /*
+ * It might seem like multiplying the number of lock waiters by as much
+ * as 20 is too aggressive, but benchmarking revealed that smaller numbers
+ * were insufficient. 512 is just an arbitrary cap to prevent pathological
+ * results.
+ */
+ extraBlocks = Min(512, lockWaiters * 20);
+
+ while (extraBlocks-- >= 0)
+ {
+ /* Ouch - an unnecessary lseek() each time through the loop! */
+ buffer = ReadBufferBI(relation, P_NEW, bistate);
+
+ /* Extend by one page. */
+ LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+ page = BufferGetPage(buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
+ PageInit(page, BufferGetPageSize(buffer), 0);
+ MarkBufferDirty(buffer);
+ blockNum = BufferGetBlockNumber(buffer);
+ freespace = PageGetHeapFreeSpace(page);
+ UnlockReleaseBuffer(buffer);
+
+ /* Remember first block number thus added. */
+ if (firstBlock == InvalidBlockNumber)
+ firstBlock = blockNum;
+
+ /*
+ * Immediately update the bottom level of the FSM. This has a good
+ * chance of making this page visible to other concurrently inserting
+ * backends, and we want that to happen without delay.
+ */
+ RecordPageWithFreeSpace(relation, blockNum, freespace);
+ }
+
+ /*
+ * Updating the upper levels of the free space map is too expensive
+ * to do for every block, but it's worth doing once at the end to make
+ * sure that subsequent insertion activity sees all of those nifty free
+ * pages we just inserted.
+ *
+ * Note that we're using the freespace value that was reported for the
+ * last block we added as if it were the freespace value for every block
+ * we added. That's actually true, because they're all equally empty.
+ */
+ UpdateFreeSpaceMap(relation, firstBlock, blockNum, freespace);
+}
+
/*
* RelationGetBufferForTuple
*
bool use_fsm = !(options & HEAP_INSERT_SKIP_FSM);
Buffer buffer = InvalidBuffer;
Page page;
- Size pageFreeSpace,
- saveFreeSpace;
+ Size pageFreeSpace = 0,
+ saveFreeSpace = 0;
BlockNumber targetBlock,
otherBlock;
bool needLock;
}
}
+loop:
while (targetBlock != InvalidBlockNumber)
{
/*
{
/* easy case */
buffer = ReadBufferBI(relation, targetBlock, bistate);
- if (PageIsAllVisible(BufferGetPage(buffer)))
+ if (PageIsAllVisible(BufferGetPage(buffer, NULL, NULL,
+ BGP_NO_SNAPSHOT_TEST)))
visibilitymap_pin(relation, targetBlock, vmbuffer);
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
}
{
/* also easy case */
buffer = otherBuffer;
- if (PageIsAllVisible(BufferGetPage(buffer)))
+ if (PageIsAllVisible(BufferGetPage(buffer, NULL, NULL,
+ BGP_NO_SNAPSHOT_TEST)))
visibilitymap_pin(relation, targetBlock, vmbuffer);
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
}
{
/* lock other buffer first */
buffer = ReadBuffer(relation, targetBlock);
- if (PageIsAllVisible(BufferGetPage(buffer)))
+ if (PageIsAllVisible(BufferGetPage(buffer, NULL, NULL,
+ BGP_NO_SNAPSHOT_TEST)))
visibilitymap_pin(relation, targetBlock, vmbuffer);
LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
{
/* lock target buffer first */
buffer = ReadBuffer(relation, targetBlock);
- if (PageIsAllVisible(BufferGetPage(buffer)))
+ if (PageIsAllVisible(BufferGetPage(buffer, NULL, NULL,
+ BGP_NO_SNAPSHOT_TEST)))
visibilitymap_pin(relation, targetBlock, vmbuffer);
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
LockBuffer(otherBuffer, BUFFER_LOCK_EXCLUSIVE);
* Now we can check to see if there's enough free space here. If so,
* we're done.
*/
- page = BufferGetPage(buffer);
+ page = BufferGetPage(buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
pageFreeSpace = PageGetHeapFreeSpace(page);
if (len + saveFreeSpace <= pageFreeSpace)
{
*/
needLock = !RELATION_IS_LOCAL(relation);
+ /*
+ * If we need the lock but are not able to acquire it immediately, we'll
+ * consider extending the relation by multiple blocks at a time to manage
+ * contention on the relation extension lock. However, this only makes
+ * sense if we're using the FSM; otherwise, there's no point.
+ */
if (needLock)
- LockRelationForExtension(relation, ExclusiveLock);
+ {
+ if (!use_fsm)
+ LockRelationForExtension(relation, ExclusiveLock);
+ else if (!ConditionalLockRelationForExtension(relation, ExclusiveLock))
+ {
+ /* Couldn't get the lock immediately; wait for it. */
+ LockRelationForExtension(relation, ExclusiveLock);
+
+ /*
+ * Check if some other backend has extended a block for us while
+ * we were waiting on the lock.
+ */
+ targetBlock = GetPageWithFreeSpace(relation, len + saveFreeSpace);
+
+ /*
+ * If some other waiter has already extended the relation, we
+ * don't need to do so; just use the existing freespace.
+ */
+ if (targetBlock != InvalidBlockNumber)
+ {
+ UnlockRelationForExtension(relation, ExclusiveLock);
+ goto loop;
+ }
+
+ /* Time to bulk-extend. */
+ RelationAddExtraBlocks(relation, bistate);
+ }
+ }
/*
+ * In addition to whatever extension we performed above, we always add
+ * at least one block to satisfy our own request.
+ *
* XXX This does an lseek - rather expensive - but at the moment it is the
* only way to accurately determine how many blocks are in a relation. Is
* it worth keeping an accurate file length in shared memory someplace,
* is empty (this should never happen, but if it does we don't want to
* risk wiping out valid data).
*/
- page = BufferGetPage(buffer);
+ page = BufferGetPage(buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST);
if (!PageIsNew(page))
elog(ERROR, "page %u of relation \"%s\" should be empty but is not",