*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.2 1996/07/30 07:47:33 scrappy Exp $
+ * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.3 1996/10/11 03:22:56 scrappy Exp $
*
* NOTES
* Outside modules can create a lock table and acquire/release
* Side Effects: The lock is always acquired. No way to abort
* a lock acquisition other than aborting the transaction.
* Lock is recorded in the lkchain.
+#ifdef USER_LOCKS
+ * Note on User Locks:
+ * User locks are handled totally on the application side as
+ * long term cooperative locks which extend beyond the normal
+ * transaction boundaries. Their purpose is to indicate to an
+ * application that someone is `working' on an item. So it is
+ * possible to put an user lock on a tuple's oid, retrieve the
+ * tuple, work on it for an hour and then update it and remove
+ * the lock. While the lock is active other clients can still
+ * read and write the tuple but they can be aware that it has
+ * been locked at the application level by someone.
+ * User locks use lock tags made of an uint16 and an uint32, for
+ * example 0 and a tuple oid, or any other arbitrary pair of
+ * numbers following a convention established by the application.
+ * In this sense tags don't refer to tuples or database entities.
+ * User locks and normal locks are completely orthogonal and
+ * they don't interfere with each other, so it is possible
+ * to acquire a normal lock on an user-locked tuple or user-lock
+ * a tuple for which a normal write lock already exists.
+ * User locks are always non blocking, therefore they are never
+ * acquired if already held by another process. They must be
+ * released explicitly by the application but they are released
+ * automatically when a backend terminates.
+ * They are indicated by a dummy tableId 0 which doesn't have
+ * any table allocated but uses the normal lock table, and are
+ * distinguished from normal locks for the following differences:
+ *
+ * normal lock user lock
+ *
+ * tableId 1 0
+ * tag.relId rel oid 0
+ * tag.ItemPointerData.ip_blkid block id lock id2
+ * tag.ItemPointerData.ip_posid tuple offset lock id1
+ * xid.pid 0 backend pid
+ * xid.xid current xid 0
+ * persistence transaction user or backend
+ *
+ * The lockt parameter can have the same values for normal locks
+ * although probably only WRITE_LOCK can have some practical use.
+ *
+ * DZ - 4 Oct 1996
+#endif
*/
bool
LockAcquire(LockTableId tableId, LOCKTAG *lockName, LOCKT lockt)
int status;
TransactionId myXid;
+#ifdef USER_LOCKS
+ int is_user_lock;
+
+ is_user_lock = (tableId == 0);
+ if (is_user_lock) {
+ tableId = 1;
+#ifdef USER_LOCKS_DEBUG
+ elog(NOTICE,"LockAcquire: user lock tag [%u,%u] %d",
+ lockName->tupleId.ip_posid,
+ ((lockName->tupleId.ip_blkid.bi_hi<<16)+
+ lockName->tupleId.ip_blkid.bi_lo),
+ lockt);
+#endif
+ }
+#endif
+
Assert (tableId < NumTables);
ltable = AllTables[tableId];
if (!ltable)
item.tag.pid = MyPid;
#endif
+#ifdef USER_LOCKS
+ if (is_user_lock) {
+ item.tag.pid = getpid();
+ item.tag.xid = myXid = 0;
+#ifdef USER_LOCKS_DEBUG
+ elog(NOTICE,"LockAcquire: user lock xid [%d,%d,%d]",
+ item.tag.lock, item.tag.pid, item.tag.xid);
+#endif
+ }
+#endif
+
result = (XIDLookupEnt *)hash_search(xidTable, (Pointer)&item, HASH_ENTER, &found);
if (!result)
{
}
else if (status == STATUS_FOUND)
{
+#ifdef USER_LOCKS
+ /*
+ * User locks are non blocking. If we can't acquire a lock
+ * remove the xid entry and return FALSE without waiting.
+ */
+ if (is_user_lock) {
+ if (!result->nHolding) {
+ SHMQueueDelete(&result->queue);
+ hash_search(xidTable, (Pointer)&item, HASH_REMOVE, &found);
+ }
+ lock->nHolding--;
+ lock->holders[lockt]--;
+ SpinRelease(masterLock);
+#ifdef USER_LOCKS_DEBUG
+ elog(NOTICE,"LockAcquire: user lock failed");
+#endif
+ return(FALSE);
+ }
+#endif
status = WaitOnLock(ltable, tableId, lock, lockt);
XID_PRINT("Someone granted me the lock", result);
}
HTAB *xidTable;
bool wakeupNeeded = true;
+#ifdef USER_LOCKS
+ int is_user_lock;
+
+ is_user_lock = (tableId == 0);
+ if (is_user_lock) {
+ tableId = 1;
+#ifdef USER_LOCKS_DEBUG
+ elog(NOTICE,"LockRelease: user lock tag [%u,%u] %d",
+ lockName->tupleId.ip_posid,
+ ((lockName->tupleId.ip_blkid.bi_hi<<16)+
+ lockName->tupleId.ip_blkid.bi_lo),
+ lockt);
+#endif
+ }
+#endif
+
Assert (tableId < NumTables);
ltable = AllTables[tableId];
if (!ltable) {
lock = (LOCK *)
hash_search(ltable->lockHash,(Pointer)lockName,HASH_FIND_SAVE,&found);
+#ifdef USER_LOCKS
+ /*
+ * If the entry is not found hash_search returns TRUE
+ * instead of NULL, so we must check it explicitly.
+ */
+ if ((is_user_lock) && (lock == (LOCK *)TRUE)) {
+ SpinRelease(masterLock);
+ elog(NOTICE,"LockRelease: there are no locks with this tag");
+ return(FALSE);
+ }
+#endif
+
/* let the caller print its own error message, too.
* Do not elog(WARN).
*/
Assert(lock->nHolding > 0);
+#ifdef USER_LOCKS
+ /*
+ * If this is an user lock it can be removed only after
+ * checking that it was acquired by the current process,
+ * so this code is skipped and executed later.
+ */
+ if (!is_user_lock) {
+#endif
/*
* fix the general lock stats
*/
Assert(lock && found);
wakeupNeeded = false;
}
+#ifdef USER_LOCKS
+ }
+#endif
/* ------------------
* Zero out all of the tag bytes (this clears the padding bytes for long
item.tag.pid = MyPid;
#endif
+#ifdef USER_LOCKS
+ if (is_user_lock) {
+ item.tag.pid = getpid();
+ item.tag.xid = 0;
+#ifdef USER_LOCKS_DEBUG
+ elog(NOTICE,"LockRelease: user lock xid [%d,%d,%d]",
+ item.tag.lock, item.tag.pid, item.tag.xid);
+#endif
+ }
+#endif
+
if (! ( result = (XIDLookupEnt *) hash_search(xidTable,
(Pointer)&item,
HASH_FIND_SAVE,
|| !found)
{
SpinRelease(masterLock);
+#ifdef USER_LOCKS
+ if ((is_user_lock) && (result)) {
+ elog(NOTICE,"LockRelease: you don't have a lock on this tag");
+ } else {
+ elog(NOTICE,"LockRelease: find xid, table corrupted");
+ }
+#else
elog(NOTICE,"LockReplace: xid table corrupted");
+#endif
return(FALSE);
}
/*
*/
if (! result->nHolding)
{
+#ifdef USER_LOCKS
+ if (result->queue.prev == INVALID_OFFSET) {
+ elog(NOTICE,"LockRelease: xid.prev == INVALID_OFFSET");
+ }
+ if (result->queue.next == INVALID_OFFSET) {
+ elog(NOTICE,"LockRelease: xid.next == INVALID_OFFSET");
+ }
+#endif
if (result->queue.next != INVALID_OFFSET)
SHMQueueDelete(&result->queue);
if (! (result = (XIDLookupEnt *)
! found)
{
SpinRelease(masterLock);
+#ifdef USER_LOCKS
+ elog(NOTICE,"LockRelease: remove xid, table corrupted");
+#else
elog(NOTICE,"LockReplace: xid table corrupted");
+#endif
return(FALSE);
}
}
+#ifdef USER_LOCKS
+ /*
+ * If this is an user lock remove it now, after the
+ * corresponding xid entry has been found and deleted.
+ */
+ if (is_user_lock) {
+ /*
+ * fix the general lock stats
+ */
+ lock->nHolding--;
+ lock->holders[lockt]--;
+ lock->nActive--;
+ lock->activeHolders[lockt]--;
+
+ Assert(lock->nActive >= 0);
+
+ if (! lock->nHolding)
+ {
+ /* ------------------
+ * if there's no one waiting in the queue,
+ * we just released the last lock.
+ * Delete it from the lock table.
+ * ------------------
+ */
+ Assert( ltable->lockHash->hash == tag_hash);
+ lock = (LOCK *) hash_search(ltable->lockHash,
+ (Pointer) &(lock->tag),
+ HASH_REMOVE,
+ &found);
+ Assert(lock && found);
+ wakeupNeeded = false;
+ }
+ }
+#endif
+
/* --------------------------
* If there are still active locks of the type I just released, no one
* should be woken up. Whoever is asleep will still conflict
lock->mask |= BITS_ON[lockt];
}
+#ifdef USER_LOCKS
+/*
+ * LockReleaseAll -- Release all locks in a process lock queue.
+ *
+ * Note: This code is a little complicated by the presence in the
+ * same queue of user locks which can't be removed from the
+ * normal lock queue at the end of a transaction. They must
+ * however be removed when the backend exits.
+ * A dummy tableId 0 is used to indicate that we are releasing
+ * the user locks, from the code added to ProcKill().
+ */
+#endif
bool
LockReleaseAll(LockTableId tableId, SHM_QUEUE *lockQueue)
{
LOCK *lock;
bool found;
+#ifdef USER_LOCKS
+ int is_user_lock_table, my_pid, count, nskip;
+
+ is_user_lock_table = (tableId == 0);
+ my_pid = getpid();
+#ifdef USER_LOCKS_DEBUG
+ elog(NOTICE,"LockReleaseAll: tableId=%d, pid=%d", tableId, my_pid);
+#endif
+ if (is_user_lock_table) {
+ tableId = 1;
+ }
+#endif
+
Assert (tableId < NumTables);
ltable = AllTables[tableId];
if (!ltable)
if (SHMQueueEmpty(lockQueue))
return TRUE;
+#ifdef USER_LOCKS
+ SpinAcquire(masterLock);
+#endif
SHMQueueFirst(lockQueue,(Pointer*)&xidLook,&xidLook->queue);
XID_PRINT("LockReleaseAll:", xidLook);
+#ifndef USER_LOCKS
SpinAcquire(masterLock);
+#else
+ count = nskip = 0;
+#endif
for (;;)
{
/* ---------------------------
LOCK_PRINT("ReleaseAll",(&lock->tag),0);
+#ifdef USER_LOCKS
+ /*
+ * Sometimes the queue appears to be messed up.
+ */
+ if (count++ > 2000) {
+ elog(NOTICE,"LockReleaseAll: xid loop detected, giving up");
+ nskip = 0;
+ break;
+ }
+ if (is_user_lock_table) {
+ if ((xidLook->tag.pid == 0) || (xidLook->tag.xid != 0)) {
+#ifdef USER_LOCKS_DEBUG
+ elog(NOTICE,"LockReleaseAll: skip normal lock [%d,%d,%d]",
+ xidLook->tag.lock,xidLook->tag.pid,xidLook->tag.xid);
+#endif
+ nskip++;
+ goto next_item;
+ }
+ if (xidLook->tag.pid != my_pid) {
+ /* This should never happen */
+#ifdef USER_LOCKS_DEBUG
+ elog(NOTICE,
+ "LockReleaseAll: skip other pid [%u,%u] [%d,%d,%d]",
+ lock->tag.tupleId.ip_posid,
+ ((lock->tag.tupleId.ip_blkid.bi_hi<<16)+
+ lock->tag.tupleId.ip_blkid.bi_lo),
+ xidLook->tag.lock,xidLook->tag.pid,xidLook->tag.xid);
+#endif
+ nskip++;
+ goto next_item;
+ }
+#ifdef USER_LOCKS_DEBUG
+ elog(NOTICE,
+ "LockReleaseAll: release user lock [%u,%u] [%d,%d,%d]",
+ lock->tag.tupleId.ip_posid,
+ ((lock->tag.tupleId.ip_blkid.bi_hi<<16)+
+ lock->tag.tupleId.ip_blkid.bi_lo),
+ xidLook->tag.lock,xidLook->tag.pid,xidLook->tag.xid);
+#endif
+ } else {
+ if ((xidLook->tag.pid != 0) || (xidLook->tag.xid == 0)) {
+#ifdef USER_LOCKS_DEBUG
+ elog(NOTICE,
+ "LockReleaseAll: skip user lock [%u,%u] [%d,%d,%d]",
+ lock->tag.tupleId.ip_posid,
+ ((lock->tag.tupleId.ip_blkid.bi_hi<<16)+
+ lock->tag.tupleId.ip_blkid.bi_lo),
+ xidLook->tag.lock,xidLook->tag.pid,xidLook->tag.xid);
+#endif
+ nskip++;
+ goto next_item;
+ }
+#ifdef USER_LOCKS_DEBUG
+ elog(NOTICE,"LockReleaseAll: release normal lock [%d,%d,%d]",
+ xidLook->tag.lock,xidLook->tag.pid,xidLook->tag.xid);
+#endif
+ }
+#endif
+
/* ------------------
* fix the general lock stats
* ------------------
* always remove the xidLookup entry, we're done with it now
* ----------------
*/
+#ifdef USER_LOCKS
+ SHMQueueDelete(&xidLook->queue);
+#endif
if ((! hash_search(ltable->xidHash, (Pointer)xidLook, HASH_REMOVE, &found))
|| !found)
{
SpinRelease(masterLock);
+#ifdef USER_LOCKS
+ elog(NOTICE,"LockReleaseAll: xid table corrupted");
+#else
elog(NOTICE,"LockReplace: xid table corrupted");
+#endif
return(FALSE);
}
if ((! lock) || (!found))
{
SpinRelease(masterLock);
+#ifdef USER_LOCKS
+ elog(NOTICE,"LockReleaseAll: cannot remove lock from HTAB");
+#else
elog(NOTICE,"LockReplace: cannot remove lock from HTAB");
+#endif
return(FALSE);
}
}
(void) ProcLockWakeup(waitQueue, (char *) ltable, (char *) lock);
}
+#ifdef USER_LOCKS
+ next_item:
+#endif
if (done)
break;
SHMQueueFirst(&xidLook->queue,(Pointer*)&tmp,&tmp->queue);
xidLook = tmp;
}
SpinRelease(masterLock);
+#ifdef USER_LOCKS
+ /*
+ * Reinitialize the queue only if nothing has been left in.
+ */
+ if (nskip == 0)
+#endif
SHMQueueInit(lockQueue);
return TRUE;
}