return without waiting if we can't get the lock immediately).
Not used yet, but will be needed for concurrent VACUUM.
either version 2, or (at your option) any later version.
-This loadable module, together with my user-lock.patch applied to the
-backend, provides support for user-level long-term cooperative locks.
-For example one can write:
+This loadable module provides support for user-level long-term cooperative
+locks. For example one can write:
select some_fields, user_write_lock_oid(oid) from table where id='key';
You can also ignore the failure and go ahead but this could produce conflicts
or inconsistent data in your application. User locks require a cooperative
behavior between users. User locks don't interfere with the normal locks
-used by postgres for transaction processing.
+used by Postgres for transaction processing.
This could also be done by setting a flag in the record itself but in
this case you have the overhead of the updates to the records and there
could be some locks not released if the backend or the application crashes
before resetting the lock flag.
It could also be done with a begin/end block but in this case the entire
-table would be locked by postgres and it is not acceptable to do this for
+table would be locked by Postgres and it is not acceptable to do this for
a long period because other transactions would block completely.
The generic user locks use two values, group and id, to identify a lock,
The meaning of group and id is defined by the application. The user
lock code just takes two numbers and tells you if the corresponding
-entity has been succesfully locked. What this mean is up to you.
+entity has been successfully locked. What this means is up to you.
-My succestion is that you use the group to identify an area of your
+My suggestion is that you use the group to identify an area of your
application and the id to identify an object in this area.
Or you can just lock the oid of the tuples which are by definition unique.
Note also that a process can acquire more than one lock on the same entity
and it must release the lock the corresponding number of times. This can
-be done calling the unlock funtion until it returns 0.
+be done by calling the unlock function until it returns 0.
/*
* user_locks.c --
*
- * This loadable module, together with my user-lock.patch applied to the
- * backend, provides support for user-level long-term cooperative locks.
+ * This loadable module provides support for user-level long-term
+ * cooperative locks.
*
* Copyright (C) 1999, Massimo Dal Zotto <dz@cs.unitn.it>
*
* This software is distributed under the GNU General Public License
* either version 2, or (at your option) any later version.
*/
-
-#include <stdio.h>
-#include <string.h>
-#include <unistd.h>
-
#include "postgres.h"
+
#include "miscadmin.h"
#include "storage/lmgr.h"
#include "storage/proc.h"
-#include "utils/elog.h"
#include "user_locks.h"
+
int
user_lock(uint32 id1, uint32 id2, LOCKMODE lockmode)
{
tag.objId.blkno = (BlockNumber) id2;
tag.offnum = (OffsetNumber) (id1 & 0xffff);
- return LockAcquire(USER_LOCKMETHOD, &tag, InvalidTransactionId, lockmode);
+ return LockAcquire(USER_LOCKMETHOD, &tag, InvalidTransactionId,
+ lockmode, true);
}
int
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lmgr.c,v 1.47 2001/06/19 19:42:16 tgl Exp $
+ * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lmgr.c,v 1.48 2001/06/22 00:04:59 tgl Exp $
*
*-------------------------------------------------------------------------
*/
static LOCKMASK LockConflicts[] = {
- (int) NULL,
+ 0,
-/* AccessShareLock */
+ /* AccessShareLock */
(1 << AccessExclusiveLock),
-/* RowShareLock */
+ /* RowShareLock */
(1 << ExclusiveLock) | (1 << AccessExclusiveLock),
-/* RowExclusiveLock */
+ /* RowExclusiveLock */
(1 << ExclusiveLock) | (1 << ShareRowExclusiveLock) | (1 << ShareLock) |
(1 << AccessExclusiveLock),
-/* ShareLock */
+ /* ShareLock */
(1 << ExclusiveLock) | (1 << ShareRowExclusiveLock) |
(1 << RowExclusiveLock) | (1 << AccessExclusiveLock),
-/* ShareRowExclusiveLock */
+ /* ShareRowExclusiveLock */
(1 << ExclusiveLock) | (1 << ShareRowExclusiveLock) |
(1 << ShareLock) | (1 << RowExclusiveLock) | (1 << AccessExclusiveLock),
-/* ExclusiveLock */
+ /* ExclusiveLock */
(1 << ExclusiveLock) | (1 << ShareRowExclusiveLock) | (1 << ShareLock) |
(1 << RowExclusiveLock) | (1 << RowShareLock) | (1 << AccessExclusiveLock),
-/* AccessExclusiveLock */
+ /* AccessExclusiveLock */
(1 << ExclusiveLock) | (1 << ShareRowExclusiveLock) | (1 << ShareLock) |
- (1 << RowExclusiveLock) | (1 << RowShareLock) | (1 << AccessExclusiveLock) |
- (1 << AccessShareLock),
+ (1 << RowExclusiveLock) | (1 << RowShareLock) |
+ (1 << AccessExclusiveLock) | (1 << AccessShareLock)
};
static int LockPrios[] = {
- (int) NULL,
+ 0,
+ /* AccessShareLock */
1,
+ /* RowShareLock */
2,
+ /* RowExclusiveLock */
3,
+ /* ShareLock */
4,
+ /* ShareRowExclusiveLock */
5,
+ /* ExclusiveLock */
6,
+ /* AccessExclusiveLock */
7
};
tag.dbId = relation->rd_lockInfo.lockRelId.dbId;
tag.objId.blkno = InvalidBlockNumber;
- if (!LockAcquire(LockTableId, &tag, GetCurrentTransactionId(), lockmode))
+ if (!LockAcquire(LockTableId, &tag, GetCurrentTransactionId(),
+ lockmode, false))
elog(ERROR, "LockRelation: LockAcquire failed");
/*
RelationDecrementReferenceCount(relation);
}
+/*
+ * ConditionalLockRelation
+ *
+ * As above, but only lock if we can get the lock without blocking.
+ * Returns TRUE iff the lock was acquired.
+ *
+ * NOTE: we do not currently need conditional versions of the other
+ * LockXXX routines in this file, but they could easily be added if needed.
+ */
+bool
+ConditionalLockRelation(Relation relation, LOCKMODE lockmode)
+{
+ LOCKTAG tag;
+
+ if (LockingDisabled())
+ return true;
+
+ MemSet(&tag, 0, sizeof(tag));
+ tag.relId = relation->rd_lockInfo.lockRelId.relId;
+ tag.dbId = relation->rd_lockInfo.lockRelId.dbId;
+ tag.objId.blkno = InvalidBlockNumber;
+
+ if (!LockAcquire(LockTableId, &tag, GetCurrentTransactionId(),
+ lockmode, true))
+ return false;
+
+ /*
+ * Check to see if the relcache entry has been invalidated while we
+ * were waiting to lock it. If so, rebuild it, or elog() trying.
+ * Increment the refcount to ensure that RelationFlushRelation will
+ * rebuild it and not just delete it.
+ */
+ RelationIncrementReferenceCount(relation);
+ AcceptInvalidationMessages();
+ RelationDecrementReferenceCount(relation);
+
+ return true;
+}
+
/*
* UnlockRelation
*/
tag.dbId = relid->dbId;
tag.objId.blkno = InvalidBlockNumber;
- if (!LockAcquire(LockTableId, &tag, InvalidTransactionId, lockmode))
+ if (!LockAcquire(LockTableId, &tag, InvalidTransactionId,
+ lockmode, false))
elog(ERROR, "LockRelationForSession: LockAcquire failed");
}
tag.dbId = relation->rd_lockInfo.lockRelId.dbId;
tag.objId.blkno = blkno;
- if (!LockAcquire(LockTableId, &tag, GetCurrentTransactionId(), lockmode))
+ if (!LockAcquire(LockTableId, &tag, GetCurrentTransactionId(),
+ lockmode, false))
elog(ERROR, "LockPage: LockAcquire failed");
}
tag.dbId = InvalidOid; /* xids are globally unique */
tag.objId.xid = xid;
- if (!LockAcquire(LockTableId, &tag, xid, ExclusiveLock))
+ if (!LockAcquire(LockTableId, &tag, xid,
+ ExclusiveLock, false))
elog(ERROR, "XactLockTableInsert: LockAcquire failed");
}
tag.dbId = InvalidOid;
tag.objId.xid = xid;
- if (!LockAcquire(LockTableId, &tag, GetCurrentTransactionId(), ShareLock))
+ if (!LockAcquire(LockTableId, &tag, GetCurrentTransactionId(),
+ ShareLock, false))
elog(ERROR, "XactLockTableWait: LockAcquire failed");
LockRelease(LockTableId, &tag, GetCurrentTransactionId(), ShareLock);
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.88 2001/03/22 03:59:46 momjian Exp $
+ * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.89 2001/06/22 00:04:59 tgl Exp $
*
* NOTES
* Outside modules can create a lock table and acquire/release
/*
* Disable flag
- *
*/
static bool LockingIsDisabled;
/*
* map from lockmethod to the lock table structure
- *
*/
static LOCKMETHODTABLE *LockMethodTable[MAX_LOCK_METHODS];
/*
* InitLocks -- Init the lock module. Create a private data
* structure for constructing conflict masks.
- *
*/
void
InitLocks(void)
/*
* LockDisable -- sets LockingIsDisabled flag to TRUE or FALSE.
- *
*/
void
LockDisable(bool status)
/*
* Boolean function to determine current locking status
- *
*/
bool
LockingDisabled(void)
long init_table_size,
max_table_size;
- if (numModes > MAX_LOCKMODES)
+ if (numModes >= MAX_LOCKMODES)
{
elog(NOTICE, "LockMethodTableInit: too many lock types %d greater than %d",
numModes, MAX_LOCKMODES);
/*
* find/acquire the spinlock for the table
- *
*/
SpinAcquire(LockMgrLock);
/*
* allocate a control structure from shared memory or attach to it if
* it already exists.
- *
*/
sprintf(shmemName, "%s (ctl)", tabName);
lockMethodTable->ctl = (LOCKMETHODCTL *)
/*
* no zero-th table
- *
*/
NumLockMethods = 1;
/*
* we're first - initialize
- *
*/
if (!found)
{
/*
* other modules refer to the lock table by a lockmethod ID
- *
*/
LockMethodTable[NumLockMethods] = lockMethodTable;
NumLockMethods++;
/*
* allocate a hash table for LOCK structs. This is used to store
* per-locked-object information.
- *
*/
info.keysize = SHMEM_LOCKTAB_KEYSIZE;
info.datasize = SHMEM_LOCKTAB_DATASIZE;
/*
* allocate a hash table for HOLDER structs. This is used to store
* per-lock-holder information.
- *
*/
info.keysize = SHMEM_HOLDERTAB_KEYSIZE;
info.datasize = SHMEM_HOLDERTAB_DATASIZE;
* LockAcquire -- Check for lock conflicts, sleep if conflict found,
* set lock if/when no conflicts.
*
- * Returns: TRUE if parameters are correct, FALSE otherwise.
+ * Returns: TRUE if lock was acquired, FALSE otherwise. Note that
+ * a FALSE return is to be expected if dontWait is TRUE;
+ * but if dontWait is FALSE, only a parameter error can cause
+ * a FALSE return. (XXX probably we should just elog on parameter
+ * errors, instead of conflating this with failure to acquire lock?)
*
- * Side Effects: The lock is always acquired. No way to abort
- * a lock acquisition other than aborting the transaction.
- * Lock is recorded in the lkchain.
+ * Side Effects: The lock is acquired and recorded in lock tables.
+ *
+ * NOTE: if we wait for the lock, there is no way to abort the wait
+ * short of aborting the transaction.
*
*
* Note on User Locks:
bool
LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
- TransactionId xid, LOCKMODE lockmode)
+ TransactionId xid, LOCKMODE lockmode, bool dontWait)
{
HOLDER *holder;
HOLDERTAG holdertag;
/*
* if it's a new lock object, initialize it
- *
*/
if (!found)
{
/*
* Create the hash key for the holder table.
- *
*/
MemSet(&holdertag, 0, sizeof(HOLDERTAG)); /* must clear padding,
* needed */
* lock->nRequested and lock->requested[] count the total number of
* requests, whether granted or waiting, so increment those
* immediately. The other counts don't increment till we get the lock.
- *
*/
lock->nRequested++;
lock->requested[lockmode]++;
/*
* If I already hold one or more locks of the requested type, just
* grant myself another one without blocking.
- *
*/
if (holder->holding[lockmode] > 0)
{
/*
* If this process (under any XID) is a holder of the lock, also grant
* myself another one without blocking.
- *
*/
LockCountMyLocks(holder->tag.lock, MyProc, myHolding);
if (myHolding[lockmode] > 0)
* If lock requested conflicts with locks requested by waiters, must
* join wait queue. Otherwise, check for conflict with already-held
* locks. (That's last because most complex check.)
- *
*/
if (lockMethodTable->ctl->conflictTab[lockmode] & lock->waitMask)
status = STATUS_FOUND;
else
{
Assert(status == STATUS_FOUND);
-#ifdef USER_LOCKS
-
/*
- * User locks are non blocking. If we can't acquire a lock we must
- * remove the holder entry and return FALSE without waiting.
+ * We can't acquire the lock immediately. If caller specified no
+ * blocking, remove the holder entry and return FALSE without waiting.
*/
- if (lockmethod == USER_LOCKMETHOD)
+ if (dontWait)
{
if (holder->nHolding == 0)
{
HOLDER_PRINT("LockAcquire: NHOLDING", holder);
lock->nRequested--;
lock->requested[lockmode]--;
- LOCK_PRINT("LockAcquire: user lock failed", lock, lockmode);
+ LOCK_PRINT("LockAcquire: conditional lock failed", lock, lockmode);
Assert((lock->nRequested > 0) && (lock->requested[lockmode] >= 0));
Assert(lock->nGranted <= lock->nRequested);
SpinRelease(masterLock);
return FALSE;
}
-#endif /* USER_LOCKS */
/*
* Construct bitmask of locks this process holds on this object.
*
* The caller can optionally pass the process's total holding counts, if
* known. If NULL is passed then these values will be computed internally.
- *
*/
int
LockCheckConflicts(LOCKMETHODTABLE *lockMethodTable,
* currently held locks. conflictTable[lockmode] has a bit set for
* each type of lock that conflicts with request. Bitwise compare
* tells if there is a conflict.
- *
*/
if (!(lockctl->conflictTab[lockmode] & lock->grantMask))
{
* have to construct a conflict mask that does not reflect our own
* locks. Locks held by the current process under another XID also
* count as "our own locks".
- *
*/
if (myHolding == NULL)
{
* now check again for conflicts. 'bitmask' describes the types of
* locks held by other processes. If one of these conflicts with the
* kind of lock that I want, there is a conflict and I have to sleep.
- *
*/
if (!(lockctl->conflictTab[lockmode] & bitmask))
{
return STATUS_OK;
}
-/*--------------------
+/*
* Remove a proc from the wait-queue it is on
* (caller must know it is on one).
*
* during a subsequent LockReleaseAll call, which we expect will happen
* during transaction cleanup. (Removal of a proc from its wait queue by
* this routine can only happen if we are aborting the transaction.)
- *--------------------
*/
void
RemoveFromWaitQueue(PROC *proc)
* positive. But that's not true anymore, because the remaining
* granted locks might belong to some waiter, who could now be
* awakened because he doesn't conflict with his own locks.
- *
*/
if (lockMethodTable->ctl->conflictTab[lockmode] & lock->waitMask)
wakeupNeeded = true;
/*
* if there's no one waiting in the queue, we just released the
* last lock on this object. Delete it from the lock table.
- *
*/
Assert(lockMethodTable->lockHash->hash == tag_hash);
lock = (LOCK *) hash_search(lockMethodTable->lockHash,
/*
* fix the general lock stats
- *
*/
if (lock->nRequested != holder->nHolding)
{
}
else
{
-
/*
* This holder accounts for all the requested locks on the
* object, so we can be lazy and just zero things out.
- *
*/
lock->nRequested = 0;
lock->nGranted = 0;
/*
* We've just released the last lock, so garbage-collect the
* lock object.
- *
*/
LOCK_PRINT("LockReleaseAll: deleting", lock, 0);
Assert(lockMethodTable->lockHash->hash == tag_hash);
* Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $Id: lmgr.h,v 1.30 2001/03/22 04:01:07 momjian Exp $
+ * $Id: lmgr.h,v 1.31 2001/06/22 00:04:59 tgl Exp $
*
*-------------------------------------------------------------------------
*/
#define AccessExclusiveLock 7 /* ALTER TABLE, DROP TABLE,
* VACUUM, and unqualified LOCK
* TABLE */
+
+/*
+ * Note: all lock mode numbers must be less than lock.h's MAX_LOCKMODES,
+ * so increase that if you want to add more modes.
+ */
+
extern LOCKMETHOD LockTableId;
/* Lock a relation */
extern void LockRelation(Relation relation, LOCKMODE lockmode);
+extern bool ConditionalLockRelation(Relation relation, LOCKMODE lockmode);
extern void UnlockRelation(Relation relation, LOCKMODE lockmode);
extern void LockRelationForSession(LockRelId *relid, LOCKMODE lockmode);
* Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $Id: lock.h,v 1.48 2001/03/22 04:01:07 momjian Exp $
+ * $Id: lock.h,v 1.49 2001/06/22 00:04:59 tgl Exp $
*
*-------------------------------------------------------------------------
*/
extern bool Trace_userlocks;
extern int Trace_lock_table;
extern bool Debug_deadlocks;
-
#endif /* LOCK_DEBUG */
#define INVALID_LOCKMETHOD INVALID_TABLEID
#define DEFAULT_LOCKMETHOD 1
#define USER_LOCKMETHOD 2
-#define MIN_LOCKMETHOD DEFAULT_LOCKMETHOD
-/* There is normally only one lock method, the default one.
- * If user locks are enabled, an additional lock method is present
+/*
+ * There is normally only one lock method, the default one.
+ * If user locks are enabled, an additional lock method is present.
*
* LOCKMETHODCTL and LOCKMETHODTABLE are split because the first lives
* in shared memory. This is because it contains a spinlock.
* LOCKMETHODTABLE exists in private memory. Both are created by the
- * postmaster and should be the same in all backends
+ * postmaster and should be the same in all backends.
*/
/*
int *prioP, int numModes, int maxBackends);
extern LOCKMETHOD LockMethodTableRename(LOCKMETHOD lockmethod);
extern bool LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
- TransactionId xid, LOCKMODE lockmode);
+ TransactionId xid, LOCKMODE lockmode, bool dontWait);
extern bool LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag,
TransactionId xid, LOCKMODE lockmode);
extern bool LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc,