*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/access/heap/heapam.c,v 1.188 2005/04/28 21:47:10 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/access/heap/heapam.c,v 1.189 2005/04/30 19:03:32 tgl Exp $
*
*
* INTERFACE ROUTINES
ItemPointer ctid, CommandId cid,
Snapshot crosscheck, bool wait)
{
+ HTSU_Result result;
TransactionId xid = GetCurrentTransactionId();
ItemId lp;
HeapTupleData tp;
PageHeader dp;
Buffer buffer;
- HTSU_Result result;
+ bool have_tuple_lock = false;
Assert(ItemPointerIsValid(tid));
TransactionId xwait;
uint16 infomask;
+ /* must copy state data before unlocking buffer */
+ xwait = HeapTupleHeaderGetXmax(tp.t_data);
+ infomask = tp.t_data->t_infomask;
+
+ LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+
+ /*
+ * Acquire tuple lock to establish our priority for the tuple
+ * (see heap_lock_tuple). LockTuple will release us when we are
+ * next-in-line for the tuple.
+ *
+ * If we are forced to "start over" below, we keep the tuple lock;
+ * this arranges that we stay at the head of the line while
+ * rechecking tuple state.
+ */
+ if (!have_tuple_lock)
+ {
+ LockTuple(relation, &(tp.t_self), ExclusiveLock);
+ have_tuple_lock = true;
+ }
+
/*
* Sleep until concurrent transaction ends. Note that we don't care
* if the locker has an exclusive or shared lock, because we need
* exclusive.
*/
- /* must copy state data before unlocking buffer */
- xwait = HeapTupleHeaderGetXmax(tp.t_data);
- infomask = tp.t_data->t_infomask;
-
if (infomask & HEAP_XMAX_IS_MULTI)
{
/* wait for multixact */
- LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
MultiXactIdWait((MultiXactId) xwait);
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
else
{
/* wait for regular transaction to end */
- LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
XactLockTableWait(xwait);
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
*ctid = tp.t_data->t_ctid;
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
ReleaseBuffer(buffer);
+ if (have_tuple_lock)
+ UnlockTuple(relation, &(tp.t_self), ExclusiveLock);
return result;
}
WriteBuffer(buffer);
+ /*
+ * Release the lmgr tuple lock, if we had it.
+ */
+ if (have_tuple_lock)
+ UnlockTuple(relation, &(tp.t_self), ExclusiveLock);
+
return HeapTupleMayBeUpdated;
}
ItemPointer ctid, CommandId cid,
Snapshot crosscheck, bool wait)
{
+ HTSU_Result result;
TransactionId xid = GetCurrentTransactionId();
ItemId lp;
HeapTupleData oldtup;
already_marked;
Size newtupsize,
pagefree;
- HTSU_Result result;
+ bool have_tuple_lock = false;
Assert(ItemPointerIsValid(otid));
TransactionId xwait;
uint16 infomask;
+ /* must copy state data before unlocking buffer */
+ xwait = HeapTupleHeaderGetXmax(oldtup.t_data);
+ infomask = oldtup.t_data->t_infomask;
+
+ LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+
+ /*
+ * Acquire tuple lock to establish our priority for the tuple
+ * (see heap_lock_tuple). LockTuple will release us when we are
+ * next-in-line for the tuple.
+ *
+ * If we are forced to "start over" below, we keep the tuple lock;
+ * this arranges that we stay at the head of the line while
+ * rechecking tuple state.
+ */
+ if (!have_tuple_lock)
+ {
+ LockTuple(relation, &(oldtup.t_self), ExclusiveLock);
+ have_tuple_lock = true;
+ }
+
/*
* Sleep until concurrent transaction ends. Note that we don't care
* if the locker has an exclusive or shared lock, because we need
* exclusive.
*/
- /* must copy state data before unlocking buffer */
- xwait = HeapTupleHeaderGetXmax(oldtup.t_data);
- infomask = oldtup.t_data->t_infomask;
-
if (infomask & HEAP_XMAX_IS_MULTI)
{
/* wait for multixact */
- LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
MultiXactIdWait((MultiXactId) xwait);
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
else
{
/* wait for regular transaction to end */
- LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
XactLockTableWait(xwait);
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
*ctid = oldtup.t_data->t_ctid;
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
ReleaseBuffer(buffer);
+ if (have_tuple_lock)
+ UnlockTuple(relation, &(oldtup.t_self), ExclusiveLock);
return result;
}
*/
CacheInvalidateHeapTuple(relation, newtup);
+ /*
+ * Release the lmgr tuple lock, if we had it.
+ */
+ if (have_tuple_lock)
+ UnlockTuple(relation, &(oldtup.t_self), ExclusiveLock);
+
return HeapTupleMayBeUpdated;
}
/*
* heap_lock_tuple - lock a tuple in shared or exclusive mode
+ *
+ * NOTES: because the shared-memory lock table is of finite size, but users
+ * could reasonably want to lock large numbers of tuples, we do not rely on
+ * the standard lock manager to store tuple-level locks over the long term.
+ * Instead, a tuple is marked as locked by setting the current transaction's
+ * XID as its XMAX, and setting additional infomask bits to distinguish this
+ * usage from the more normal case of having deleted the tuple. When
+ * multiple transactions concurrently share-lock a tuple, the first locker's
+ * XID is replaced in XMAX with a MultiTransactionId representing the set of
+ * XIDs currently holding share-locks.
+ *
+ * When it is necessary to wait for a tuple-level lock to be released, the
+ * basic delay is provided by XactLockTableWait or MultiXactIdWait on the
+ * contents of the tuple's XMAX. However, that mechanism will release all
+ * waiters concurrently, so there would be a race condition as to which
+ * waiter gets the tuple, potentially leading to indefinite starvation of
+ * some waiters. The possibility of share-locking makes the problem much
+ * worse --- a steady stream of share-lockers can easily block an exclusive
+ * locker forever. To provide more reliable semantics about who gets a
+ * tuple-level lock first, we use the standard lock manager. The protocol
+ * for waiting for a tuple-level lock is really
+ * LockTuple()
+ * XactLockTableWait()
+ * mark tuple as locked by me
+ * UnlockTuple()
+ * When there are multiple waiters, arbitration of who is to get the lock next
+ * is provided by LockTuple(). However, at most one tuple-level lock will
+ * be held or awaited per backend at any time, so we don't risk overflow
+ * of the lock table. Note that incoming share-lockers are required to
+ * do LockTuple as well, if there is any conflict, to ensure that they don't
+ * starve out waiting exclusive-lockers. However, if there is not any active
+ * conflict for a tuple, we don't incur any extra overhead.
*/
HTSU_Result
heap_lock_tuple(Relation relation, HeapTuple tuple, Buffer *buffer,
CommandId cid, LockTupleMode mode)
{
- TransactionId xid;
+ HTSU_Result result;
ItemPointer tid = &(tuple->t_self);
ItemId lp;
PageHeader dp;
- HTSU_Result result;
+ TransactionId xid;
uint16 new_infomask;
+ LOCKMODE tuple_lock_type;
+ bool have_tuple_lock = false;
+
+ tuple_lock_type = (mode == LockTupleShared) ? ShareLock : ExclusiveLock;
*buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid));
LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
}
else if (result == HeapTupleBeingUpdated)
{
- if (mode == LockTupleShared &&
- (tuple->t_data->t_infomask & HEAP_XMAX_SHARED_LOCK))
- result = HeapTupleMayBeUpdated;
- else
+ TransactionId xwait;
+ uint16 infomask;
+
+ /* must copy state data before unlocking buffer */
+ xwait = HeapTupleHeaderGetXmax(tuple->t_data);
+ infomask = tuple->t_data->t_infomask;
+
+ LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
+
+ /*
+ * Acquire tuple lock to establish our priority for the tuple.
+ * LockTuple will release us when we are next-in-line for the
+ * tuple. We must do this even if we are share-locking.
+ *
+ * If we are forced to "start over" below, we keep the tuple lock;
+ * this arranges that we stay at the head of the line while
+ * rechecking tuple state.
+ */
+ if (!have_tuple_lock)
{
- TransactionId xwait;
- uint16 infomask;
+ LockTuple(relation, tid, tuple_lock_type);
+ have_tuple_lock = true;
+ }
+ if (mode == LockTupleShared && (infomask & HEAP_XMAX_SHARED_LOCK))
+ {
/*
- * Sleep until concurrent transaction ends.
+ * Acquiring sharelock when there's at least one sharelocker
+ * already. We need not wait for him/them to complete.
*/
+ LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
- /* must copy state data before unlocking buffer */
- xwait = HeapTupleHeaderGetXmax(tuple->t_data);
- infomask = tuple->t_data->t_infomask;
-
- if (infomask & HEAP_XMAX_IS_MULTI)
- {
- /* wait for multixact */
- LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
- MultiXactIdWait((MultiXactId) xwait);
- LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
-
- /*
- * If xwait had just locked the tuple then some other xact
- * could update this tuple before we get to this point.
- * Check for xmax change, and start over if so.
- */
- if (!(tuple->t_data->t_infomask & HEAP_XMAX_IS_MULTI) ||
- !TransactionIdEquals(HeapTupleHeaderGetXmax(tuple->t_data),
- xwait))
- goto l3;
+ /*
+ * Make sure it's still a shared lock, else start over. (It's
+ * OK if the ownership of the shared lock has changed, though.)
+ */
+ if (!(tuple->t_data->t_infomask & HEAP_XMAX_SHARED_LOCK))
+ goto l3;
+ }
+ else if (infomask & HEAP_XMAX_IS_MULTI)
+ {
+ /* wait for multixact to end */
+ MultiXactIdWait((MultiXactId) xwait);
+ LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
- /*
- * You might think the multixact is necessarily done here, but
- * not so: it could have surviving members, namely our own xact
- * or other subxacts of this backend. It is legal for us to
- * lock the tuple in either case, however. We don't bother
- * changing the on-disk hint bits since we are about to
- * overwrite the xmax altogether.
- */
- }
- else
- {
- /* wait for regular transaction to end */
- LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
- XactLockTableWait(xwait);
- LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
+ /*
+ * If xwait had just locked the tuple then some other xact
+ * could update this tuple before we get to this point.
+ * Check for xmax change, and start over if so.
+ */
+ if (!(tuple->t_data->t_infomask & HEAP_XMAX_IS_MULTI) ||
+ !TransactionIdEquals(HeapTupleHeaderGetXmax(tuple->t_data),
+ xwait))
+ goto l3;
- /*
- * xwait is done, but if xwait had just locked the tuple then
- * some other xact could update this tuple before we get to
- * this point. Check for xmax change, and start over if so.
- */
- if ((tuple->t_data->t_infomask & HEAP_XMAX_IS_MULTI) ||
- !TransactionIdEquals(HeapTupleHeaderGetXmax(tuple->t_data),
- xwait))
- goto l3;
-
- /* Otherwise we can mark it committed or aborted */
- if (!(tuple->t_data->t_infomask & (HEAP_XMAX_COMMITTED |
- HEAP_XMAX_INVALID)))
- {
- if (TransactionIdDidCommit(xwait))
- tuple->t_data->t_infomask |= HEAP_XMAX_COMMITTED;
- else
- tuple->t_data->t_infomask |= HEAP_XMAX_INVALID;
- SetBufferCommitInfoNeedsSave(*buffer);
- }
- }
+ /*
+ * You might think the multixact is necessarily done here, but
+ * not so: it could have surviving members, namely our own xact
+ * or other subxacts of this backend. It is legal for us to
+ * lock the tuple in either case, however. We don't bother
+ * changing the on-disk hint bits since we are about to
+ * overwrite the xmax altogether.
+ */
+ }
+ else
+ {
+ /* wait for regular transaction to end */
+ XactLockTableWait(xwait);
+ LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE);
/*
- * We may lock if previous xmax aborted, or if it committed
- * but only locked the tuple without updating it.
+ * xwait is done, but if xwait had just locked the tuple then
+ * some other xact could update this tuple before we get to
+ * this point. Check for xmax change, and start over if so.
*/
- if (tuple->t_data->t_infomask & (HEAP_XMAX_INVALID |
- HEAP_IS_LOCKED))
- result = HeapTupleMayBeUpdated;
- else
- result = HeapTupleUpdated;
+ if ((tuple->t_data->t_infomask & HEAP_XMAX_IS_MULTI) ||
+ !TransactionIdEquals(HeapTupleHeaderGetXmax(tuple->t_data),
+ xwait))
+ goto l3;
+
+ /* Otherwise we can mark it committed or aborted */
+ if (!(tuple->t_data->t_infomask & (HEAP_XMAX_COMMITTED |
+ HEAP_XMAX_INVALID)))
+ {
+ if (TransactionIdDidCommit(xwait))
+ tuple->t_data->t_infomask |= HEAP_XMAX_COMMITTED;
+ else
+ tuple->t_data->t_infomask |= HEAP_XMAX_INVALID;
+ SetBufferCommitInfoNeedsSave(*buffer);
+ }
}
+
+ /*
+ * We may lock if previous xmax aborted, or if it committed
+ * but only locked the tuple without updating it. The case where
+ * we didn't wait because we are joining an existing shared lock
+ * is correctly handled, too.
+ */
+ if (tuple->t_data->t_infomask & (HEAP_XMAX_INVALID |
+ HEAP_IS_LOCKED))
+ result = HeapTupleMayBeUpdated;
+ else
+ result = HeapTupleUpdated;
}
if (result != HeapTupleMayBeUpdated)
{
+ ItemPointerData newctid = tuple->t_data->t_ctid;
+
Assert(result == HeapTupleSelfUpdated || result == HeapTupleUpdated);
- tuple->t_self = tuple->t_data->t_ctid;
LockBuffer(*buffer, BUFFER_LOCK_UNLOCK);
+ if (have_tuple_lock)
+ UnlockTuple(relation, tid, tuple_lock_type);
+ /* can't overwrite t_self (== *tid) until after above Unlock */
+ tuple->t_self = newctid;
return result;
}
WriteNoReleaseBuffer(*buffer);
+ /*
+ * Now that we have successfully marked the tuple as locked, we can
+ * release the lmgr tuple lock, if we had it.
+ */
+ if (have_tuple_lock)
+ UnlockTuple(relation, tid, tuple_lock_type);
+
return HeapTupleMayBeUpdated;
}
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/storage/lmgr/lmgr.c,v 1.72 2005/04/29 22:28:24 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/storage/lmgr/lmgr.c,v 1.73 2005/04/30 19:03:33 tgl Exp $
*
*-------------------------------------------------------------------------
*/
LockRelease(LockTableId, &tag, GetTopTransactionId(), lockmode);
}
+/*
+ * LockTuple
+ *
+ * Obtain a tuple-level lock. This is used in a less-than-intuitive fashion
+ * because we can't afford to keep a separate lock in shared memory for every
+ * tuple. See heap_lock_tuple before using this!
+ */
+void
+LockTuple(Relation relation, ItemPointer tid, LOCKMODE lockmode)
+{
+ LOCKTAG tag;
+
+ SET_LOCKTAG_TUPLE(tag,
+ relation->rd_lockInfo.lockRelId.dbId,
+ relation->rd_lockInfo.lockRelId.relId,
+ ItemPointerGetBlockNumber(tid),
+ ItemPointerGetOffsetNumber(tid));
+
+ if (!LockAcquire(LockTableId, &tag, GetTopTransactionId(),
+ lockmode, false))
+ elog(ERROR, "LockAcquire failed");
+}
+
+/*
+ * UnlockTuple
+ */
+void
+UnlockTuple(Relation relation, ItemPointer tid, LOCKMODE lockmode)
+{
+ LOCKTAG tag;
+
+ SET_LOCKTAG_TUPLE(tag,
+ relation->rd_lockInfo.lockRelId.dbId,
+ relation->rd_lockInfo.lockRelId.relId,
+ ItemPointerGetBlockNumber(tid),
+ ItemPointerGetOffsetNumber(tid));
+
+ LockRelease(LockTableId, &tag, GetTopTransactionId(), lockmode);
+}
+
/*
* XactLockTableInsert
*
if (!TransactionIdDidCommit(xid) && !TransactionIdDidAbort(xid))
TransactionIdAbort(xid);
}
+
+
+/*
+ * LockDatabaseObject
+ *
+ * Obtain a lock on a general object of the current database. Don't use
+ * this for shared objects (such as tablespaces). It's usually unwise to
+ * apply it to entire relations, also, since a lock taken this way will
+ * NOT conflict with LockRelation.
+ */
+void
+LockDatabaseObject(Oid classid, Oid objid, uint16 objsubid,
+ LOCKMODE lockmode)
+{
+ LOCKTAG tag;
+
+ SET_LOCKTAG_OBJECT(tag,
+ MyDatabaseId,
+ classid,
+ objid,
+ objsubid);
+
+ if (!LockAcquire(LockTableId, &tag, GetTopTransactionId(),
+ lockmode, false))
+ elog(ERROR, "LockAcquire failed");
+}
+
+/*
+ * UnlockDatabaseObject
+ */
+void
+UnlockDatabaseObject(Oid classid, Oid objid, uint16 objsubid,
+ LOCKMODE lockmode)
+{
+ LOCKTAG tag;
+
+ SET_LOCKTAG_OBJECT(tag,
+ MyDatabaseId,
+ classid,
+ objid,
+ objsubid);
+
+ LockRelease(LockTableId, &tag, GetTopTransactionId(), lockmode);
+}
+
+/*
+ * LockSharedObject
+ *
+ * Obtain a lock on a shared-across-databases object.
+ */
+void
+LockSharedObject(Oid classid, Oid objid, uint16 objsubid,
+ LOCKMODE lockmode)
+{
+ LOCKTAG tag;
+
+ SET_LOCKTAG_OBJECT(tag,
+ InvalidOid,
+ classid,
+ objid,
+ objsubid);
+
+ if (!LockAcquire(LockTableId, &tag, GetTopTransactionId(),
+ lockmode, false))
+ elog(ERROR, "LockAcquire failed");
+}
+
+/*
+ * UnlockSharedObject
+ */
+void
+UnlockSharedObject(Oid classid, Oid objid, uint16 objsubid,
+ LOCKMODE lockmode)
+{
+ LOCKTAG tag;
+
+ SET_LOCKTAG_OBJECT(tag,
+ InvalidOid,
+ classid,
+ objid,
+ objsubid);
+
+ LockRelease(LockTableId, &tag, GetTopTransactionId(), lockmode);
+}