*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.114 2001/05/12 19:58:27 tgl Exp $
+ * $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.115 2001/05/16 22:35:12 tgl Exp $
*
*
* INTERFACE ROUTINES
#endif
/* Find buffer for this tuple */
- buffer = RelationGetBufferForTuple(relation, tup->t_len);
+ buffer = RelationGetBufferForTuple(relation, tup->t_len, 0);
/* NO ELOG(ERROR) from here till changes are logged */
START_CRIT_SECTION();
newbuf;
bool need_toast,
already_marked;
+ Size newtupsize,
+ pagefree;
int result;
/* increment access statistics */
HeapTupleHasExtended(newtup) ||
(MAXALIGN(newtup->t_len) > TOAST_TUPLE_THRESHOLD));
- if (need_toast ||
- (unsigned) MAXALIGN(newtup->t_len) > PageGetFreeSpace((Page) dp))
+ newtupsize = MAXALIGN(newtup->t_len);
+ pagefree = PageGetFreeSpace((Page) dp);
+
+ if (need_toast || newtupsize > pagefree)
{
_locked_tuple_.node = relation->rd_node;
_locked_tuple_.tid = oldtup.t_self;
/* Let the toaster do its thing */
if (need_toast)
+ {
heap_tuple_toast_attrs(relation, newtup, &oldtup);
+ newtupsize = MAXALIGN(newtup->t_len);
+ }
- /* Now, do we need a new page for the tuple, or not? */
- if ((unsigned) MAXALIGN(newtup->t_len) <= PageGetFreeSpace((Page) dp))
- newbuf = buffer;
+ /*
+ * Now, do we need a new page for the tuple, or not? This is a bit
+ * tricky since someone else could have added tuples to the page
+ * while we weren't looking. We have to recheck the available space
+ * after reacquiring the buffer lock. But don't bother to do that
+ * if the former amount of free space is still not enough; it's
+ * unlikely there's more free now than before.
+ *
+ * What's more, if we need to get a new page, we will need to acquire
+ * buffer locks on both old and new pages. To avoid deadlock against
+ * some other backend trying to get the same two locks in the other
+ * order, we must be consistent about the order we get the locks in.
+ * We use the rule "lock the higher-numbered page of the relation
+ * first". To implement this, we must do RelationGetBufferForTuple
+ * while not holding the lock on the old page, and we must tell it
+ * to give us a page beyond the old page.
+ */
+ if (newtupsize > pagefree)
+ {
+ /* Assume there's no chance to put newtup on same page. */
+ newbuf = RelationGetBufferForTuple(relation, newtup->t_len,
+ BufferGetBlockNumber(buffer) + 1);
+ /* Now reacquire lock on old tuple's page. */
+ LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+ }
else
- newbuf = RelationGetBufferForTuple(relation, newtup->t_len);
-
- /* Re-acquire the lock on the old tuple's page. */
- /* this seems to be deadlock free... */
- LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+ {
+ /* Re-acquire the lock on the old tuple's page. */
+ LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+ /* Re-check using the up-to-date free space */
+ pagefree = PageGetFreeSpace((Page) dp);
+ if (newtupsize > pagefree)
+ {
+ /*
+ * Rats, it doesn't fit anymore. We must now unlock and
+ * relock to avoid deadlock. Fortunately, this path should
+ * seldom be taken.
+ */
+ LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+ newbuf = RelationGetBufferForTuple(relation, newtup->t_len,
+ BufferGetBlockNumber(buffer) + 1);
+ LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+ }
+ else
+ {
+ /* OK, it fits here, so we're done. */
+ newbuf = buffer;
+ }
+ }
}
else
{
newbuf = buffer;
}
+ /*
+ * At this point newbuf and buffer are both pinned and locked,
+ * and newbuf has enough space for the new tuple.
+ */
+
/* NO ELOG(ERROR) from here till changes are logged */
START_CRIT_SECTION();
*
*
* IDENTIFICATION
- * $Id: hio.c,v 1.38 2001/05/12 19:58:27 tgl Exp $
+ * $Id: hio.c,v 1.39 2001/05/16 22:35:12 tgl Exp $
*
*-------------------------------------------------------------------------
*/
/*
* RelationGetBufferForTuple
*
- * Returns exclusive-locked buffer with free space >= given len.
+ * Returns exclusive-locked buffer with free space >= given len,
+ * being careful to select only a page at or beyond minblocknum
+ * in the relation.
*
- * Note that we use LockPage to lock relation for extension. We can
- * do this as long as in all other places we use page-level locking
- * for indices only. Alternatively, we could define pseudo-table as
- * we do for transactions with XactLockTable.
+ * The minblocknum parameter is needed to prevent deadlock between
+ * concurrent heap_update operations; see heap_update for details.
+ * Pass zero if you don't particularly care which page you get.
*
- * ELOG(ERROR) is allowed here, so this routine *must* be called
- * before any (unlogged) changes are made in buffer pool.
+ * Note that we use LockPage to lock relation for extension. We can
+ * do this as long as in all other places we use page-level locking
+ * for indices only. Alternatively, we could define pseudo-table as
+ * we do for transactions with XactLockTable.
+ *
+ * ELOG(ERROR) is allowed here, so this routine *must* be called
+ * before any (unlogged) changes are made in buffer pool.
*/
Buffer
-RelationGetBufferForTuple(Relation relation, Size len)
+RelationGetBufferForTuple(Relation relation, Size len,
+ BlockNumber minblocknum)
{
Buffer buffer = InvalidBuffer;
Page pageHeader;
if (relation->rd_nblocks > 0)
{
lastblock = relation->rd_nblocks - 1;
- buffer = ReadBuffer(relation, lastblock);
- LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
- pageHeader = (Page) BufferGetPage(buffer);
- if (len <= PageGetFreeSpace(pageHeader))
- return buffer;
- /*
- * Doesn't fit, so we'll have to try someplace else.
- */
- LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
- /* buffer release will happen below... */
+ if (lastblock >= minblocknum)
+ {
+ buffer = ReadBuffer(relation, lastblock);
+ LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+ pageHeader = (Page) BufferGetPage(buffer);
+ if (len <= PageGetFreeSpace(pageHeader))
+ return buffer;
+ /*
+ * Doesn't fit, so we'll have to try someplace else.
+ */
+ LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+ /* buffer release will happen below... */
+ }
}
/*
*/
relation->rd_nblocks = RelationGetNumberOfBlocks(relation);
- if (relation->rd_nblocks > oldnblocks)
+ if ((BlockNumber) relation->rd_nblocks > oldnblocks)
{
/*
* Someone else has indeed extended the relation recently.
* Try to fit our tuple into the new last page.
*/
lastblock = relation->rd_nblocks - 1;
- buffer = ReleaseAndReadBuffer(buffer, relation, lastblock, false);
- LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
- pageHeader = (Page) BufferGetPage(buffer);
- if (len <= PageGetFreeSpace(pageHeader))
+ if (lastblock >= minblocknum)
{
- /* OK, we don't need to extend again. */
- if (!relation->rd_myxactonly)
- UnlockPage(relation, 0, ExclusiveLock);
- return buffer;
+ buffer = ReleaseAndReadBuffer(buffer, relation, lastblock, false);
+ LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+ pageHeader = (Page) BufferGetPage(buffer);
+ if (len <= PageGetFreeSpace(pageHeader))
+ {
+ /* OK, we don't need to extend again. */
+ if (!relation->rd_myxactonly)
+ UnlockPage(relation, 0, ExclusiveLock);
+ return buffer;
+ }
+ /*
+ * Doesn't fit, so we'll have to extend the relation (again).
+ */
+ LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+ /* buffer release will happen below... */
}
- /*
- * Doesn't fit, so we'll have to extend the relation (again).
- */
- LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
- /* buffer release will happen below... */
}
/*
* Extend the relation by one page and update rd_nblocks for next time.
+ *
+ * Note: at this point minblocknum is ignored; we won't extend by more
+ * than one block...
*/
lastblock = relation->rd_nblocks;
buffer = ReleaseAndReadBuffer(buffer, relation, lastblock, true);