From: Tom Lane Date: Fri, 22 Jun 2001 00:04:59 +0000 (+0000) Subject: Add support to lock manager for conditionally locking a lock (ie, X-Git-Tag: REL7_2_BETA1~987 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=d8d9ed931e8a2370d3995c40af2eb3bda18aecb0;p=postgresql Add support to lock manager for conditionally locking a lock (ie, return without waiting if we can't get the lock immediately). Not used yet, but will be needed for concurrent VACUUM. --- diff --git a/contrib/userlock/README.user_locks b/contrib/userlock/README.user_locks index 4c923a4657..48acdd1d5f 100644 --- a/contrib/userlock/README.user_locks +++ b/contrib/userlock/README.user_locks @@ -5,9 +5,8 @@ This software is distributed under the GNU General Public License either version 2, or (at your option) any later version. -This loadable module, together with my user-lock.patch applied to the -backend, provides support for user-level long-term cooperative locks. -For example one can write: +This loadable module provides support for user-level long-term cooperative +locks. For example one can write: select some_fields, user_write_lock_oid(oid) from table where id='key'; @@ -26,14 +25,14 @@ After you have finished your work on that item you can do: You can also ignore the failure and go ahead but this could produce conflicts or inconsistent data in your application. User locks require a cooperative behavior between users. User locks don't interfere with the normal locks -used by postgres for transaction processing. +used by Postgres for transaction processing. This could also be done by setting a flag in the record itself but in this case you have the overhead of the updates to the records and there could be some locks not released if the backend or the application crashes before resetting the lock flag. It could also be done with a begin/end block but in this case the entire -table would be locked by postgres and it is not acceptable to do this for +table would be locked by Postgres and it is not acceptable to do this for a long period because other transactions would block completely. The generic user locks use two values, group and id, to identify a lock, @@ -44,12 +43,12 @@ use a group equal to 0. The meaning of group and id is defined by the application. The user lock code just takes two numbers and tells you if the corresponding -entity has been succesfully locked. What this mean is up to you. +entity has been successfully locked. What this means is up to you. -My succestion is that you use the group to identify an area of your +My suggestion is that you use the group to identify an area of your application and the id to identify an object in this area. Or you can just lock the oid of the tuples which are by definition unique. Note also that a process can acquire more than one lock on the same entity and it must release the lock the corresponding number of times. This can -be done calling the unlock funtion until it returns 0. +be done by calling the unlock function until it returns 0. diff --git a/contrib/userlock/user_locks.c b/contrib/userlock/user_locks.c index 567d2f817f..ae33c40cc2 100644 --- a/contrib/userlock/user_locks.c +++ b/contrib/userlock/user_locks.c @@ -1,27 +1,23 @@ /* * user_locks.c -- * - * This loadable module, together with my user-lock.patch applied to the - * backend, provides support for user-level long-term cooperative locks. + * This loadable module provides support for user-level long-term + * cooperative locks. * * Copyright (C) 1999, Massimo Dal Zotto * * This software is distributed under the GNU General Public License * either version 2, or (at your option) any later version. */ - -#include -#include -#include - #include "postgres.h" + #include "miscadmin.h" #include "storage/lmgr.h" #include "storage/proc.h" -#include "utils/elog.h" #include "user_locks.h" + int user_lock(uint32 id1, uint32 id2, LOCKMODE lockmode) { @@ -33,7 +29,8 @@ user_lock(uint32 id1, uint32 id2, LOCKMODE lockmode) tag.objId.blkno = (BlockNumber) id2; tag.offnum = (OffsetNumber) (id1 & 0xffff); - return LockAcquire(USER_LOCKMETHOD, &tag, InvalidTransactionId, lockmode); + return LockAcquire(USER_LOCKMETHOD, &tag, InvalidTransactionId, + lockmode, true); } int diff --git a/src/backend/storage/lmgr/lmgr.c b/src/backend/storage/lmgr/lmgr.c index e645e92b22..fa3812a87e 100644 --- a/src/backend/storage/lmgr/lmgr.c +++ b/src/backend/storage/lmgr/lmgr.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lmgr.c,v 1.47 2001/06/19 19:42:16 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lmgr.c,v 1.48 2001/06/22 00:04:59 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -24,45 +24,52 @@ static LOCKMASK LockConflicts[] = { - (int) NULL, + 0, -/* AccessShareLock */ + /* AccessShareLock */ (1 << AccessExclusiveLock), -/* RowShareLock */ + /* RowShareLock */ (1 << ExclusiveLock) | (1 << AccessExclusiveLock), -/* RowExclusiveLock */ + /* RowExclusiveLock */ (1 << ExclusiveLock) | (1 << ShareRowExclusiveLock) | (1 << ShareLock) | (1 << AccessExclusiveLock), -/* ShareLock */ + /* ShareLock */ (1 << ExclusiveLock) | (1 << ShareRowExclusiveLock) | (1 << RowExclusiveLock) | (1 << AccessExclusiveLock), -/* ShareRowExclusiveLock */ + /* ShareRowExclusiveLock */ (1 << ExclusiveLock) | (1 << ShareRowExclusiveLock) | (1 << ShareLock) | (1 << RowExclusiveLock) | (1 << AccessExclusiveLock), -/* ExclusiveLock */ + /* ExclusiveLock */ (1 << ExclusiveLock) | (1 << ShareRowExclusiveLock) | (1 << ShareLock) | (1 << RowExclusiveLock) | (1 << RowShareLock) | (1 << AccessExclusiveLock), -/* AccessExclusiveLock */ + /* AccessExclusiveLock */ (1 << ExclusiveLock) | (1 << ShareRowExclusiveLock) | (1 << ShareLock) | - (1 << RowExclusiveLock) | (1 << RowShareLock) | (1 << AccessExclusiveLock) | - (1 << AccessShareLock), + (1 << RowExclusiveLock) | (1 << RowShareLock) | + (1 << AccessExclusiveLock) | (1 << AccessShareLock) }; static int LockPrios[] = { - (int) NULL, + 0, + /* AccessShareLock */ 1, + /* RowShareLock */ 2, + /* RowExclusiveLock */ 3, + /* ShareLock */ 4, + /* ShareRowExclusiveLock */ 5, + /* ExclusiveLock */ 6, + /* AccessExclusiveLock */ 7 }; @@ -134,7 +141,8 @@ LockRelation(Relation relation, LOCKMODE lockmode) tag.dbId = relation->rd_lockInfo.lockRelId.dbId; tag.objId.blkno = InvalidBlockNumber; - if (!LockAcquire(LockTableId, &tag, GetCurrentTransactionId(), lockmode)) + if (!LockAcquire(LockTableId, &tag, GetCurrentTransactionId(), + lockmode, false)) elog(ERROR, "LockRelation: LockAcquire failed"); /* @@ -148,6 +156,45 @@ LockRelation(Relation relation, LOCKMODE lockmode) RelationDecrementReferenceCount(relation); } +/* + * ConditionalLockRelation + * + * As above, but only lock if we can get the lock without blocking. + * Returns TRUE iff the lock was acquired. + * + * NOTE: we do not currently need conditional versions of the other + * LockXXX routines in this file, but they could easily be added if needed. + */ +bool +ConditionalLockRelation(Relation relation, LOCKMODE lockmode) +{ + LOCKTAG tag; + + if (LockingDisabled()) + return true; + + MemSet(&tag, 0, sizeof(tag)); + tag.relId = relation->rd_lockInfo.lockRelId.relId; + tag.dbId = relation->rd_lockInfo.lockRelId.dbId; + tag.objId.blkno = InvalidBlockNumber; + + if (!LockAcquire(LockTableId, &tag, GetCurrentTransactionId(), + lockmode, true)) + return false; + + /* + * Check to see if the relcache entry has been invalidated while we + * were waiting to lock it. If so, rebuild it, or elog() trying. + * Increment the refcount to ensure that RelationFlushRelation will + * rebuild it and not just delete it. + */ + RelationIncrementReferenceCount(relation); + AcceptInvalidationMessages(); + RelationDecrementReferenceCount(relation); + + return true; +} + /* * UnlockRelation */ @@ -192,7 +239,8 @@ LockRelationForSession(LockRelId *relid, LOCKMODE lockmode) tag.dbId = relid->dbId; tag.objId.blkno = InvalidBlockNumber; - if (!LockAcquire(LockTableId, &tag, InvalidTransactionId, lockmode)) + if (!LockAcquire(LockTableId, &tag, InvalidTransactionId, + lockmode, false)) elog(ERROR, "LockRelationForSession: LockAcquire failed"); } @@ -231,7 +279,8 @@ LockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode) tag.dbId = relation->rd_lockInfo.lockRelId.dbId; tag.objId.blkno = blkno; - if (!LockAcquire(LockTableId, &tag, GetCurrentTransactionId(), lockmode)) + if (!LockAcquire(LockTableId, &tag, GetCurrentTransactionId(), + lockmode, false)) elog(ERROR, "LockPage: LockAcquire failed"); } @@ -267,7 +316,8 @@ XactLockTableInsert(TransactionId xid) tag.dbId = InvalidOid; /* xids are globally unique */ tag.objId.xid = xid; - if (!LockAcquire(LockTableId, &tag, xid, ExclusiveLock)) + if (!LockAcquire(LockTableId, &tag, xid, + ExclusiveLock, false)) elog(ERROR, "XactLockTableInsert: LockAcquire failed"); } @@ -303,7 +353,8 @@ XactLockTableWait(TransactionId xid) tag.dbId = InvalidOid; tag.objId.xid = xid; - if (!LockAcquire(LockTableId, &tag, GetCurrentTransactionId(), ShareLock)) + if (!LockAcquire(LockTableId, &tag, GetCurrentTransactionId(), + ShareLock, false)) elog(ERROR, "XactLockTableWait: LockAcquire failed"); LockRelease(LockTableId, &tag, GetCurrentTransactionId(), ShareLock); diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c index 912a25ff22..577b420797 100644 --- a/src/backend/storage/lmgr/lock.c +++ b/src/backend/storage/lmgr/lock.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.88 2001/03/22 03:59:46 momjian Exp $ + * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.89 2001/06/22 00:04:59 tgl Exp $ * * NOTES * Outside modules can create a lock table and acquire/release @@ -161,13 +161,11 @@ static LOCKMASK BITS_ON[MAX_LOCKMODES]; /* * Disable flag - * */ static bool LockingIsDisabled; /* * map from lockmethod to the lock table structure - * */ static LOCKMETHODTABLE *LockMethodTable[MAX_LOCK_METHODS]; @@ -176,7 +174,6 @@ static int NumLockMethods; /* * InitLocks -- Init the lock module. Create a private data * structure for constructing conflict masks. - * */ void InitLocks(void) @@ -194,7 +191,6 @@ InitLocks(void) /* * LockDisable -- sets LockingIsDisabled flag to TRUE or FALSE. - * */ void LockDisable(bool status) @@ -204,7 +200,6 @@ LockDisable(bool status) /* * Boolean function to determine current locking status - * */ bool LockingDisabled(void) @@ -278,7 +273,7 @@ LockMethodTableInit(char *tabName, long init_table_size, max_table_size; - if (numModes > MAX_LOCKMODES) + if (numModes >= MAX_LOCKMODES) { elog(NOTICE, "LockMethodTableInit: too many lock types %d greater than %d", numModes, MAX_LOCKMODES); @@ -299,14 +294,12 @@ LockMethodTableInit(char *tabName, /* * find/acquire the spinlock for the table - * */ SpinAcquire(LockMgrLock); /* * allocate a control structure from shared memory or attach to it if * it already exists. - * */ sprintf(shmemName, "%s (ctl)", tabName); lockMethodTable->ctl = (LOCKMETHODCTL *) @@ -317,13 +310,11 @@ LockMethodTableInit(char *tabName, /* * no zero-th table - * */ NumLockMethods = 1; /* * we're first - initialize - * */ if (!found) { @@ -334,7 +325,6 @@ LockMethodTableInit(char *tabName, /* * other modules refer to the lock table by a lockmethod ID - * */ LockMethodTable[NumLockMethods] = lockMethodTable; NumLockMethods++; @@ -343,7 +333,6 @@ LockMethodTableInit(char *tabName, /* * allocate a hash table for LOCK structs. This is used to store * per-locked-object information. - * */ info.keysize = SHMEM_LOCKTAB_KEYSIZE; info.datasize = SHMEM_LOCKTAB_DATASIZE; @@ -364,7 +353,6 @@ LockMethodTableInit(char *tabName, /* * allocate a hash table for HOLDER structs. This is used to store * per-lock-holder information. - * */ info.keysize = SHMEM_HOLDERTAB_KEYSIZE; info.datasize = SHMEM_HOLDERTAB_DATASIZE; @@ -426,11 +414,16 @@ LockMethodTableRename(LOCKMETHOD lockmethod) * LockAcquire -- Check for lock conflicts, sleep if conflict found, * set lock if/when no conflicts. * - * Returns: TRUE if parameters are correct, FALSE otherwise. + * Returns: TRUE if lock was acquired, FALSE otherwise. Note that + * a FALSE return is to be expected if dontWait is TRUE; + * but if dontWait is FALSE, only a parameter error can cause + * a FALSE return. (XXX probably we should just elog on parameter + * errors, instead of conflating this with failure to acquire lock?) * - * Side Effects: The lock is always acquired. No way to abort - * a lock acquisition other than aborting the transaction. - * Lock is recorded in the lkchain. + * Side Effects: The lock is acquired and recorded in lock tables. + * + * NOTE: if we wait for the lock, there is no way to abort the wait + * short of aborting the transaction. * * * Note on User Locks: @@ -480,7 +473,7 @@ LockMethodTableRename(LOCKMETHOD lockmethod) bool LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, - TransactionId xid, LOCKMODE lockmode) + TransactionId xid, LOCKMODE lockmode, bool dontWait) { HOLDER *holder; HOLDERTAG holdertag; @@ -532,7 +525,6 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, /* * if it's a new lock object, initialize it - * */ if (!found) { @@ -556,7 +548,6 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, /* * Create the hash key for the holder table. - * */ MemSet(&holdertag, 0, sizeof(HOLDERTAG)); /* must clear padding, * needed */ @@ -632,7 +623,6 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, * lock->nRequested and lock->requested[] count the total number of * requests, whether granted or waiting, so increment those * immediately. The other counts don't increment till we get the lock. - * */ lock->nRequested++; lock->requested[lockmode]++; @@ -641,7 +631,6 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, /* * If I already hold one or more locks of the requested type, just * grant myself another one without blocking. - * */ if (holder->holding[lockmode] > 0) { @@ -654,7 +643,6 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, /* * If this process (under any XID) is a holder of the lock, also grant * myself another one without blocking. - * */ LockCountMyLocks(holder->tag.lock, MyProc, myHolding); if (myHolding[lockmode] > 0) @@ -669,7 +657,6 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, * If lock requested conflicts with locks requested by waiters, must * join wait queue. Otherwise, check for conflict with already-held * locks. (That's last because most complex check.) - * */ if (lockMethodTable->ctl->conflictTab[lockmode] & lock->waitMask) status = STATUS_FOUND; @@ -686,13 +673,11 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, else { Assert(status == STATUS_FOUND); -#ifdef USER_LOCKS - /* - * User locks are non blocking. If we can't acquire a lock we must - * remove the holder entry and return FALSE without waiting. + * We can't acquire the lock immediately. If caller specified no + * blocking, remove the holder entry and return FALSE without waiting. */ - if (lockmethod == USER_LOCKMETHOD) + if (dontWait) { if (holder->nHolding == 0) { @@ -708,13 +693,12 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, HOLDER_PRINT("LockAcquire: NHOLDING", holder); lock->nRequested--; lock->requested[lockmode]--; - LOCK_PRINT("LockAcquire: user lock failed", lock, lockmode); + LOCK_PRINT("LockAcquire: conditional lock failed", lock, lockmode); Assert((lock->nRequested > 0) && (lock->requested[lockmode] >= 0)); Assert(lock->nGranted <= lock->nRequested); SpinRelease(masterLock); return FALSE; } -#endif /* USER_LOCKS */ /* * Construct bitmask of locks this process holds on this object. @@ -781,7 +765,6 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, * * The caller can optionally pass the process's total holding counts, if * known. If NULL is passed then these values will be computed internally. - * */ int LockCheckConflicts(LOCKMETHODTABLE *lockMethodTable, @@ -806,7 +789,6 @@ LockCheckConflicts(LOCKMETHODTABLE *lockMethodTable, * currently held locks. conflictTable[lockmode] has a bit set for * each type of lock that conflicts with request. Bitwise compare * tells if there is a conflict. - * */ if (!(lockctl->conflictTab[lockmode] & lock->grantMask)) { @@ -819,7 +801,6 @@ LockCheckConflicts(LOCKMETHODTABLE *lockMethodTable, * have to construct a conflict mask that does not reflect our own * locks. Locks held by the current process under another XID also * count as "our own locks". - * */ if (myHolding == NULL) { @@ -841,7 +822,6 @@ LockCheckConflicts(LOCKMETHODTABLE *lockMethodTable, * now check again for conflicts. 'bitmask' describes the types of * locks held by other processes. If one of these conflicts with the * kind of lock that I want, there is a conflict and I have to sleep. - * */ if (!(lockctl->conflictTab[lockmode] & bitmask)) { @@ -977,7 +957,7 @@ WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode, return STATUS_OK; } -/*-------------------- +/* * Remove a proc from the wait-queue it is on * (caller must know it is on one). * @@ -988,7 +968,6 @@ WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode, * during a subsequent LockReleaseAll call, which we expect will happen * during transaction cleanup. (Removal of a proc from its wait queue by * this routine can only happen if we are aborting the transaction.) - *-------------------- */ void RemoveFromWaitQueue(PROC *proc) @@ -1164,7 +1143,6 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, * positive. But that's not true anymore, because the remaining * granted locks might belong to some waiter, who could now be * awakened because he doesn't conflict with his own locks. - * */ if (lockMethodTable->ctl->conflictTab[lockmode] & lock->waitMask) wakeupNeeded = true; @@ -1175,7 +1153,6 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, /* * if there's no one waiting in the queue, we just released the * last lock on this object. Delete it from the lock table. - * */ Assert(lockMethodTable->lockHash->hash == tag_hash); lock = (LOCK *) hash_search(lockMethodTable->lockHash, @@ -1305,7 +1282,6 @@ LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc, /* * fix the general lock stats - * */ if (lock->nRequested != holder->nHolding) { @@ -1335,11 +1311,9 @@ LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc, } else { - /* * This holder accounts for all the requested locks on the * object, so we can be lazy and just zero things out. - * */ lock->nRequested = 0; lock->nGranted = 0; @@ -1380,7 +1354,6 @@ LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc, /* * We've just released the last lock, so garbage-collect the * lock object. - * */ LOCK_PRINT("LockReleaseAll: deleting", lock, 0); Assert(lockMethodTable->lockHash->hash == tag_hash); diff --git a/src/include/storage/lmgr.h b/src/include/storage/lmgr.h index 45af8beded..c279ef63f2 100644 --- a/src/include/storage/lmgr.h +++ b/src/include/storage/lmgr.h @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $Id: lmgr.h,v 1.30 2001/03/22 04:01:07 momjian Exp $ + * $Id: lmgr.h,v 1.31 2001/06/22 00:04:59 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -33,6 +33,12 @@ #define AccessExclusiveLock 7 /* ALTER TABLE, DROP TABLE, * VACUUM, and unqualified LOCK * TABLE */ + +/* + * Note: all lock mode numbers must be less than lock.h's MAX_LOCKMODES, + * so increase that if you want to add more modes. + */ + extern LOCKMETHOD LockTableId; @@ -41,6 +47,7 @@ extern void RelationInitLockInfo(Relation relation); /* Lock a relation */ extern void LockRelation(Relation relation, LOCKMODE lockmode); +extern bool ConditionalLockRelation(Relation relation, LOCKMODE lockmode); extern void UnlockRelation(Relation relation, LOCKMODE lockmode); extern void LockRelationForSession(LockRelId *relid, LOCKMODE lockmode); diff --git a/src/include/storage/lock.h b/src/include/storage/lock.h index 3d5ddb7038..2428a782a6 100644 --- a/src/include/storage/lock.h +++ b/src/include/storage/lock.h @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $Id: lock.h,v 1.48 2001/03/22 04:01:07 momjian Exp $ + * $Id: lock.h,v 1.49 2001/06/22 00:04:59 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -38,7 +38,6 @@ extern bool Trace_locks; extern bool Trace_userlocks; extern int Trace_lock_table; extern bool Debug_deadlocks; - #endif /* LOCK_DEBUG */ @@ -75,15 +74,15 @@ typedef int LOCKMETHOD; #define INVALID_LOCKMETHOD INVALID_TABLEID #define DEFAULT_LOCKMETHOD 1 #define USER_LOCKMETHOD 2 -#define MIN_LOCKMETHOD DEFAULT_LOCKMETHOD -/* There is normally only one lock method, the default one. - * If user locks are enabled, an additional lock method is present +/* + * There is normally only one lock method, the default one. + * If user locks are enabled, an additional lock method is present. * * LOCKMETHODCTL and LOCKMETHODTABLE are split because the first lives * in shared memory. This is because it contains a spinlock. * LOCKMETHODTABLE exists in private memory. Both are created by the - * postmaster and should be the same in all backends + * postmaster and should be the same in all backends. */ /* @@ -263,7 +262,7 @@ extern LOCKMETHOD LockMethodTableInit(char *tabName, LOCKMASK *conflictsP, int *prioP, int numModes, int maxBackends); extern LOCKMETHOD LockMethodTableRename(LOCKMETHOD lockmethod); extern bool LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, - TransactionId xid, LOCKMODE lockmode); + TransactionId xid, LOCKMODE lockmode, bool dontWait); extern bool LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, TransactionId xid, LOCKMODE lockmode); extern bool LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc,