* lock.c
* POSTGRES low-level lock mechanism
*
- * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.120 2003/02/18 02:13:24 momjian Exp $
+ * $PostgreSQL: pgsql/src/backend/storage/lmgr/lock.c,v 1.150 2005/04/29 22:28:24 tgl Exp $
*
* NOTES
* Outside modules can create a lock table and acquire/release
* Interface:
*
* LockAcquire(), LockRelease(), LockMethodTableInit(),
- * LockMethodTableRename(), LockReleaseAll,
+ * LockMethodTableRename(), LockReleaseAll(),
* LockCheckConflicts(), GrantLock()
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
-#include <unistd.h>
#include <signal.h>
+#include <unistd.h>
#include "access/xact.h"
#include "miscadmin.h"
#include "storage/proc.h"
#include "utils/memutils.h"
#include "utils/ps_status.h"
+#include "utils/resowner.h"
/* This configuration variable is used to set the lock table size */
#define NLOCKENTS(maxBackends) (max_locks_per_xact * (maxBackends))
-static int WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode,
- LOCK *lock, PROCLOCK *proclock);
-static void LockCountMyLocks(SHMEM_OFFSET lockOffset, PGPROC *proc,
- int *myHolding);
+/*
+ * map from lock method id to the lock table data structures
+ */
+static LockMethod LockMethods[MAX_LOCK_METHODS];
+static HTAB *LockMethodLockHash[MAX_LOCK_METHODS];
+static HTAB *LockMethodProcLockHash[MAX_LOCK_METHODS];
+static HTAB *LockMethodLocalHash[MAX_LOCK_METHODS];
-static char *lock_mode_names[] =
+/* exported so lmgr.c can initialize it */
+int NumLockMethods;
+
+
+/* private state for GrantAwaitedLock */
+static LOCALLOCK *awaitedLock;
+static ResourceOwner awaitedOwner;
+
+
+static const char *const lock_mode_names[] =
{
"INVALID",
"AccessShareLock",
* --------
*/
-int Trace_lock_oidmin = BootstrapObjectIdData;
+int Trace_lock_oidmin = FirstNormalObjectId;
bool Trace_locks = false;
bool Trace_userlocks = false;
int Trace_lock_table = 0;
LOCK_DEBUG_ENABLED(const LOCK *lock)
{
return
- (((LOCK_LOCKMETHOD(*lock) == DEFAULT_LOCKMETHOD && Trace_locks)
- || (LOCK_LOCKMETHOD(*lock) == USER_LOCKMETHOD && Trace_userlocks))
- && (lock->tag.relId >= (Oid) Trace_lock_oidmin))
- || (Trace_lock_table && (lock->tag.relId == Trace_lock_table));
+ (((Trace_locks && LOCK_LOCKMETHOD(*lock) == DEFAULT_LOCKMETHOD)
+ || (Trace_userlocks && LOCK_LOCKMETHOD(*lock) == USER_LOCKMETHOD))
+ && ((Oid) lock->tag.locktag_field2 >= (Oid) Trace_lock_oidmin))
+ || (Trace_lock_table
+ && (lock->tag.locktag_field2 == Trace_lock_table));
}
{
if (LOCK_DEBUG_ENABLED(lock))
elog(LOG,
- "%s: lock(%lx) tbl(%d) rel(%u) db(%u) obj(%u) grantMask(%x) "
+ "%s: lock(%lx) id(%u,%u,%u,%u,%u,%u) grantMask(%x) "
"req(%d,%d,%d,%d,%d,%d,%d)=%d "
"grant(%d,%d,%d,%d,%d,%d,%d)=%d wait(%d) type(%s)",
where, MAKE_OFFSET(lock),
- lock->tag.lockmethod, lock->tag.relId, lock->tag.dbId,
- lock->tag.objId.blkno, lock->grantMask,
+ lock->tag.locktag_field1, lock->tag.locktag_field2,
+ lock->tag.locktag_field3, lock->tag.locktag_field4,
+ lock->tag.locktag_type, lock->tag.locktag_lockmethodid,
+ lock->grantMask,
lock->requested[1], lock->requested[2], lock->requested[3],
lock->requested[4], lock->requested[5], lock->requested[6],
lock->requested[7], lock->nRequested,
inline static void
PROCLOCK_PRINT(const char *where, const PROCLOCK *proclockP)
{
- if (
- (((PROCLOCK_LOCKMETHOD(*proclockP) == DEFAULT_LOCKMETHOD && Trace_locks)
- || (PROCLOCK_LOCKMETHOD(*proclockP) == USER_LOCKMETHOD && Trace_userlocks))
- && (((LOCK *) MAKE_PTR(proclockP->tag.lock))->tag.relId >= (Oid) Trace_lock_oidmin))
- || (Trace_lock_table && (((LOCK *) MAKE_PTR(proclockP->tag.lock))->tag.relId == Trace_lock_table))
- )
+ if (LOCK_DEBUG_ENABLED((LOCK *) MAKE_PTR(proclockP->tag.lock)))
elog(LOG,
- "%s: proclock(%lx) lock(%lx) tbl(%d) proc(%lx) xid(%u) hold(%d,%d,%d,%d,%d,%d,%d)=%d",
+ "%s: proclock(%lx) lock(%lx) method(%u) proc(%lx) xid(%u) hold(%x)",
where, MAKE_OFFSET(proclockP), proclockP->tag.lock,
PROCLOCK_LOCKMETHOD(*(proclockP)),
proclockP->tag.proc, proclockP->tag.xid,
- proclockP->holding[1], proclockP->holding[2], proclockP->holding[3],
- proclockP->holding[4], proclockP->holding[5], proclockP->holding[6],
- proclockP->holding[7], proclockP->nHolding);
+ (int) proclockP->holdMask);
}
#else /* not LOCK_DEBUG */
#endif /* not LOCK_DEBUG */
-/*
- * These are to simplify/speed up some bit arithmetic.
- *
- * XXX is a fetch from a static array really faster than a shift?
- * Wouldn't bet on it...
- */
-
-static LOCKMASK BITS_OFF[MAX_LOCKMODES];
-static LOCKMASK BITS_ON[MAX_LOCKMODES];
-
-/*
- * map from lockmethod to the lock table structure
- */
-static LOCKMETHODTABLE *LockMethodTable[MAX_LOCK_METHODS];
+static void RemoveLocalLock(LOCALLOCK *locallock);
+static void GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner);
+static int WaitOnLock(LOCKMETHODID lockmethodid, LOCALLOCK *locallock,
+ ResourceOwner owner);
+static void LockCountMyLocks(SHMEM_OFFSET lockOffset, PGPROC *proc,
+ int *myHolding);
+static bool UnGrantLock(LOCK *lock, LOCKMODE lockmode,
+ PROCLOCK *proclock, LockMethod lockMethodTable);
-static int NumLockMethods;
/*
* InitLocks -- Init the lock module. Create a private data
void
InitLocks(void)
{
- int i;
- int bit;
-
- bit = 1;
- for (i = 0; i < MAX_LOCKMODES; i++, bit <<= 1)
- {
- BITS_ON[i] = bit;
- BITS_OFF[i] = ~bit;
- }
+ /* NOP */
}
/*
* Fetch the lock method table associated with a given lock
*/
-LOCKMETHODTABLE *
+LockMethod
GetLocksMethodTable(LOCK *lock)
{
- LOCKMETHOD lockmethod = LOCK_LOCKMETHOD(*lock);
+ LOCKMETHODID lockmethodid = LOCK_LOCKMETHOD(*lock);
- Assert(lockmethod > 0 && lockmethod < NumLockMethods);
- return LockMethodTable[lockmethod];
+ Assert(0 < lockmethodid && lockmethodid < NumLockMethods);
+ return LockMethods[lockmethodid];
}
* Notes: just copying. Should only be called once.
*/
static void
-LockMethodInit(LOCKMETHODTABLE *lockMethodTable,
- LOCKMASK *conflictsP,
+LockMethodInit(LockMethod lockMethodTable,
+ const LOCKMASK *conflictsP,
int numModes)
{
int i;
lockMethodTable->numLockModes = numModes;
- numModes++;
- for (i = 0; i < numModes; i++, conflictsP++)
- lockMethodTable->conflictTab[i] = *conflictsP;
+ /* copies useless zero element as well as the N lockmodes */
+ for (i = 0; i <= numModes; i++)
+ lockMethodTable->conflictTab[i] = conflictsP[i];
}
/*
* by the postmaster are inherited by each backend, so they must be in
* TopMemoryContext.
*/
-LOCKMETHOD
-LockMethodTableInit(char *tabName,
- LOCKMASK *conflictsP,
+LOCKMETHODID
+LockMethodTableInit(const char *tabName,
+ const LOCKMASK *conflictsP,
int numModes,
int maxBackends)
{
- LOCKMETHODTABLE *lockMethodTable;
+ LockMethod newLockMethod;
+ LOCKMETHODID lockmethodid;
char *shmemName;
HASHCTL info;
int hash_flags;
max_table_size;
if (numModes >= MAX_LOCKMODES)
- {
- elog(WARNING, "LockMethodTableInit: too many lock types %d greater than %d",
- numModes, MAX_LOCKMODES);
- return INVALID_LOCKMETHOD;
- }
+ elog(ERROR, "too many lock types %d (limit is %d)",
+ numModes, MAX_LOCKMODES - 1);
/* Compute init/max size to request for lock hashtables */
max_table_size = NLOCKENTS(maxBackends);
- init_table_size = max_table_size / 10;
+ init_table_size = max_table_size / 2;
/* Allocate a string for the shmem index table lookups. */
/* This is just temp space in this routine, so palloc is OK. */
/* each lock table has a header in shared memory */
sprintf(shmemName, "%s (lock method table)", tabName);
- lockMethodTable = (LOCKMETHODTABLE *)
- ShmemInitStruct(shmemName, sizeof(LOCKMETHODTABLE), &found);
-
- if (!lockMethodTable)
- elog(FATAL, "LockMethodTableInit: couldn't initialize %s", tabName);
-
- /*
- * Lock the LWLock for the table (probably not necessary here)
- */
- LWLockAcquire(LockMgrLock, LW_EXCLUSIVE);
+ newLockMethod = (LockMethod)
+ ShmemInitStruct(shmemName, sizeof(LockMethodData), &found);
- /*
- * no zero-th table
- */
- NumLockMethods = 1;
+ if (!newLockMethod)
+ elog(FATAL, "could not initialize lock table \"%s\"", tabName);
/*
* we're first - initialize
*/
if (!found)
{
- MemSet(lockMethodTable, 0, sizeof(LOCKMETHODTABLE));
- lockMethodTable->masterLock = LockMgrLock;
- lockMethodTable->lockmethod = NumLockMethods;
+ MemSet(newLockMethod, 0, sizeof(LockMethodData));
+ newLockMethod->masterLock = LockMgrLock;
+ LockMethodInit(newLockMethod, conflictsP, numModes);
}
/*
* other modules refer to the lock table by a lockmethod ID
*/
- LockMethodTable[NumLockMethods] = lockMethodTable;
- NumLockMethods++;
- Assert(NumLockMethods <= MAX_LOCK_METHODS);
+ Assert(NumLockMethods < MAX_LOCK_METHODS);
+ lockmethodid = NumLockMethods++;
+ LockMethods[lockmethodid] = newLockMethod;
/*
* allocate a hash table for LOCK structs. This is used to store
* per-locked-object information.
*/
+ MemSet(&info, 0, sizeof(info));
info.keysize = sizeof(LOCKTAG);
info.entrysize = sizeof(LOCK);
info.hash = tag_hash;
hash_flags = (HASH_ELEM | HASH_FUNCTION);
sprintf(shmemName, "%s (lock hash)", tabName);
- lockMethodTable->lockHash = ShmemInitHash(shmemName,
- init_table_size,
- max_table_size,
- &info,
- hash_flags);
+ LockMethodLockHash[lockmethodid] = ShmemInitHash(shmemName,
+ init_table_size,
+ max_table_size,
+ &info,
+ hash_flags);
- if (!lockMethodTable->lockHash)
- elog(FATAL, "LockMethodTableInit: couldn't initialize %s", tabName);
- Assert(lockMethodTable->lockHash->hash == tag_hash);
+ if (!LockMethodLockHash[lockmethodid])
+ elog(FATAL, "could not initialize lock table \"%s\"", tabName);
/*
* allocate a hash table for PROCLOCK structs. This is used to store
- * per-lock-proclock information.
+ * per-lock-holder information.
*/
info.keysize = sizeof(PROCLOCKTAG);
info.entrysize = sizeof(PROCLOCK);
hash_flags = (HASH_ELEM | HASH_FUNCTION);
sprintf(shmemName, "%s (proclock hash)", tabName);
- lockMethodTable->proclockHash = ShmemInitHash(shmemName,
- init_table_size,
- max_table_size,
- &info,
- hash_flags);
+ LockMethodProcLockHash[lockmethodid] = ShmemInitHash(shmemName,
+ init_table_size,
+ max_table_size,
+ &info,
+ hash_flags);
+
+ if (!LockMethodProcLockHash[lockmethodid])
+ elog(FATAL, "could not initialize lock table \"%s\"", tabName);
- if (!lockMethodTable->proclockHash)
- elog(FATAL, "LockMethodTableInit: couldn't initialize %s", tabName);
+ /*
+ * allocate a non-shared hash table for LOCALLOCK structs. This is
+ * used to store lock counts and resource owner information.
+ *
+ * The non-shared table could already exist in this process (this occurs
+ * when the postmaster is recreating shared memory after a backend
+ * crash). If so, delete and recreate it. (We could simply leave it,
+ * since it ought to be empty in the postmaster, but for safety let's
+ * zap it.)
+ */
+ if (LockMethodLocalHash[lockmethodid])
+ hash_destroy(LockMethodLocalHash[lockmethodid]);
- /* init data structures */
- LockMethodInit(lockMethodTable, conflictsP, numModes);
+ info.keysize = sizeof(LOCALLOCKTAG);
+ info.entrysize = sizeof(LOCALLOCK);
+ info.hash = tag_hash;
+ hash_flags = (HASH_ELEM | HASH_FUNCTION);
- LWLockRelease(LockMgrLock);
+ sprintf(shmemName, "%s (locallock hash)", tabName);
+ LockMethodLocalHash[lockmethodid] = hash_create(shmemName,
+ 128,
+ &info,
+ hash_flags);
pfree(shmemName);
- return lockMethodTable->lockmethod;
+ return lockmethodid;
}
/*
* LockMethodTableRename -- allocate another lockmethod ID to the same
* lock table.
*
- * NOTES: Both the lock module and the lock chain (lchain.c)
- * module use table id's to distinguish between different
- * kinds of locks. Short term and long term locks look
- * the same to the lock table, but are handled differently
- * by the lock chain manager. This function allows the
- * client to use different lockmethods when acquiring/releasing
- * short term and long term locks, yet store them all in one hashtable.
+ * NOTES: This function makes it possible to have different lockmethodids,
+ * and hence different locking semantics, while still storing all
+ * the data in one shared-memory hashtable.
*/
-LOCKMETHOD
-LockMethodTableRename(LOCKMETHOD lockmethod)
+LOCKMETHODID
+LockMethodTableRename(LOCKMETHODID lockmethodid)
{
- LOCKMETHOD newLockMethod;
+ LOCKMETHODID newLockMethodId;
if (NumLockMethods >= MAX_LOCK_METHODS)
return INVALID_LOCKMETHOD;
- if (LockMethodTable[lockmethod] == INVALID_LOCKMETHOD)
+ if (LockMethods[lockmethodid] == INVALID_LOCKMETHOD)
return INVALID_LOCKMETHOD;
/* other modules refer to the lock table by a lockmethod ID */
- newLockMethod = NumLockMethods;
+ newLockMethodId = NumLockMethods;
NumLockMethods++;
- LockMethodTable[newLockMethod] = LockMethodTable[lockmethod];
- return newLockMethod;
+ LockMethods[newLockMethodId] = LockMethods[lockmethodid];
+ LockMethodLockHash[newLockMethodId] = LockMethodLockHash[lockmethodid];
+ LockMethodProcLockHash[newLockMethodId] = LockMethodProcLockHash[lockmethodid];
+ LockMethodLocalHash[newLockMethodId] = LockMethodLocalHash[lockmethodid];
+
+ return newLockMethodId;
}
/*
* Returns: TRUE if lock was acquired, FALSE otherwise. Note that
* a FALSE return is to be expected if dontWait is TRUE;
* but if dontWait is FALSE, only a parameter error can cause
- * a FALSE return. (XXX probably we should just elog on parameter
+ * a FALSE return. (XXX probably we should just ereport on parameter
* errors, instead of conflating this with failure to acquire lock?)
*
* Side Effects: The lock is acquired and recorded in lock tables.
* the lock. While the lock is active other clients can still
* read and write the tuple but they can be aware that it has
* been locked at the application level by someone.
- * User locks use lock tags made of an uint16 and an uint32, for
- * example 0 and a tuple oid, or any other arbitrary pair of
- * numbers following a convention established by the application.
- * In this sense tags don't refer to tuples or database entities.
+ *
* User locks and normal locks are completely orthogonal and
- * they don't interfere with each other, so it is possible
- * to acquire a normal lock on an user-locked tuple or user-lock
- * a tuple for which a normal write lock already exists.
+ * they don't interfere with each other.
+ *
* User locks are always non blocking, therefore they are never
* acquired if already held by another process. They must be
* released explicitly by the application but they are released
* automatically when a backend terminates.
* They are indicated by a lockmethod 2 which is an alias for the
- * normal lock table, and are distinguished from normal locks
- * by the following differences:
- *
- * normal lock user lock
- *
- * lockmethod 1 2
- * tag.dbId database oid database oid
- * tag.relId rel oid or 0 0
- * tag.objId block id lock id2
- * or xact id
- * tag.offnum 0 lock id1
- * proclock.xid xid or 0 0
- * persistence transaction user or backend
- * or backend
+ * normal lock table.
*
* The lockmode parameter can have the same values for normal locks
* although probably only WRITE_LOCK can have some practical use.
*/
bool
-LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
+LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
TransactionId xid, LOCKMODE lockmode, bool dontWait)
{
+ LOCALLOCKTAG localtag;
+ LOCALLOCK *locallock;
+ LOCK *lock;
PROCLOCK *proclock;
PROCLOCKTAG proclocktag;
- HTAB *proclockTable;
bool found;
- LOCK *lock;
+ ResourceOwner owner;
LWLockId masterLock;
- LOCKMETHODTABLE *lockMethodTable;
+ LockMethod lockMethodTable;
int status;
int myHolding[MAX_LOCKMODES];
int i;
#ifdef LOCK_DEBUG
- if (lockmethod == USER_LOCKMETHOD && Trace_userlocks)
- elog(LOG, "LockAcquire: user lock [%u] %s",
- locktag->objId.blkno, lock_mode_names[lockmode]);
+ if (Trace_userlocks && lockmethodid == USER_LOCKMETHOD)
+ elog(LOG, "LockAcquire: user lock [%u,%u] %s",
+ locktag->locktag_field1, locktag->locktag_field2,
+ lock_mode_names[lockmode]);
#endif
- /* ???????? This must be changed when short term locks will be used */
- locktag->lockmethod = lockmethod;
+ /* ugly */
+ locktag->locktag_lockmethodid = lockmethodid;
- Assert(lockmethod < NumLockMethods);
- lockMethodTable = LockMethodTable[lockmethod];
+ Assert(lockmethodid < NumLockMethods);
+ lockMethodTable = LockMethods[lockmethodid];
if (!lockMethodTable)
{
- elog(WARNING, "LockAcquire: bad lock table %d", lockmethod);
+ elog(WARNING, "bad lock table id: %d", lockmethodid);
return FALSE;
}
+ /* Session locks and user locks are not transactional */
+ if (xid != InvalidTransactionId &&
+ lockmethodid == DEFAULT_LOCKMETHOD)
+ owner = CurrentResourceOwner;
+ else
+ owner = NULL;
+
+ /*
+ * Find or create a LOCALLOCK entry for this lock and lockmode
+ */
+ MemSet(&localtag, 0, sizeof(localtag)); /* must clear padding */
+ localtag.lock = *locktag;
+ localtag.xid = xid;
+ localtag.mode = lockmode;
+
+ locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash[lockmethodid],
+ (void *) &localtag,
+ HASH_ENTER, &found);
+ if (!locallock)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of memory")));
+
+ /*
+ * if it's a new locallock object, initialize it
+ */
+ if (!found)
+ {
+ locallock->lock = NULL;
+ locallock->proclock = NULL;
+ locallock->nLocks = 0;
+ locallock->numLockOwners = 0;
+ locallock->maxLockOwners = 8;
+ locallock->lockOwners = NULL;
+ locallock->lockOwners = (LOCALLOCKOWNER *)
+ MemoryContextAlloc(TopMemoryContext,
+ locallock->maxLockOwners * sizeof(LOCALLOCKOWNER));
+ }
+ else
+ {
+ /* Make sure there will be room to remember the lock */
+ if (locallock->numLockOwners >= locallock->maxLockOwners)
+ {
+ int newsize = locallock->maxLockOwners * 2;
+
+ locallock->lockOwners = (LOCALLOCKOWNER *)
+ repalloc(locallock->lockOwners,
+ newsize * sizeof(LOCALLOCKOWNER));
+ locallock->maxLockOwners = newsize;
+ }
+ }
+
+ /*
+ * If we already hold the lock, we can just increase the count
+ * locally.
+ */
+ if (locallock->nLocks > 0)
+ {
+ GrantLockLocal(locallock, owner);
+ return TRUE;
+ }
+
+ /*
+ * Otherwise we've got to mess with the shared lock table.
+ */
masterLock = lockMethodTable->masterLock;
LWLockAcquire(masterLock, LW_EXCLUSIVE);
/*
- * Find or create a lock with this tag
+ * Find or create a lock with this tag.
+ *
+ * Note: if the locallock object already existed, it might have a pointer
+ * to the lock already ... but we probably should not assume that that
+ * pointer is valid, since a lock object with no locks can go away
+ * anytime.
*/
- Assert(lockMethodTable->lockHash->hash == tag_hash);
- lock = (LOCK *) hash_search(lockMethodTable->lockHash,
+ lock = (LOCK *) hash_search(LockMethodLockHash[lockmethodid],
(void *) locktag,
HASH_ENTER, &found);
if (!lock)
{
LWLockRelease(masterLock);
- elog(ERROR, "LockAcquire: lock table %d is out of memory",
- lockmethod);
- return FALSE;
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of shared memory"),
+ errhint("You may need to increase max_locks_per_transaction.")));
}
+ locallock->lock = lock;
/*
* if it's a new lock object, initialize it
{
lock->grantMask = 0;
lock->waitMask = 0;
- SHMQueueInit(&(lock->lockHolders));
+ SHMQueueInit(&(lock->procLocks));
ProcQueueInit(&(lock->waitProcs));
lock->nRequested = 0;
lock->nGranted = 0;
/*
* Create the hash key for the proclock table.
*/
- MemSet(&proclocktag, 0, sizeof(PROCLOCKTAG)); /* must clear padding,
- * needed */
+ MemSet(&proclocktag, 0, sizeof(PROCLOCKTAG)); /* must clear padding */
proclocktag.lock = MAKE_OFFSET(lock);
proclocktag.proc = MAKE_OFFSET(MyProc);
TransactionIdStore(xid, &proclocktag.xid);
/*
* Find or create a proclock entry with this tag
*/
- proclockTable = lockMethodTable->proclockHash;
- proclock = (PROCLOCK *) hash_search(proclockTable,
- (void *) &proclocktag,
- HASH_ENTER, &found);
+ proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid],
+ (void *) &proclocktag,
+ HASH_ENTER, &found);
if (!proclock)
{
+ /* Ooops, not enough shmem for the proclock */
+ if (lock->nRequested == 0)
+ {
+ /*
+ * There are no other requestors of this lock, so garbage-collect
+ * the lock object. We *must* do this to avoid a permanent leak
+ * of shared memory, because there won't be anything to cause
+ * anyone to release the lock object later.
+ */
+ Assert(SHMQueueEmpty(&(lock->procLocks)));
+ lock = (LOCK *) hash_search(LockMethodLockHash[lockmethodid],
+ (void *) &(lock->tag),
+ HASH_REMOVE, NULL);
+ }
LWLockRelease(masterLock);
- elog(ERROR, "LockAcquire: proclock table out of memory");
- return FALSE;
+ if (!lock) /* hash remove failed? */
+ elog(WARNING, "lock table corrupted");
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of shared memory"),
+ errhint("You may need to increase max_locks_per_transaction.")));
}
+ locallock->proclock = proclock;
/*
* If new, initialize the new entry
*/
if (!found)
{
- proclock->nHolding = 0;
- MemSet((char *) proclock->holding, 0, sizeof(int) * MAX_LOCKMODES);
+ proclock->holdMask = 0;
/* Add proclock to appropriate lists */
- SHMQueueInsertBefore(&lock->lockHolders, &proclock->lockLink);
- SHMQueueInsertBefore(&MyProc->procHolders, &proclock->procLink);
+ SHMQueueInsertBefore(&lock->procLocks, &proclock->lockLink);
+ SHMQueueInsertBefore(&MyProc->procLocks, &proclock->procLink);
PROCLOCK_PRINT("LockAcquire: new", proclock);
}
else
{
PROCLOCK_PRINT("LockAcquire: found", proclock);
- Assert((proclock->nHolding >= 0) && (proclock->holding[lockmode] >= 0));
- Assert(proclock->nHolding <= lock->nGranted);
+ Assert((proclock->holdMask & ~lock->grantMask) == 0);
#ifdef CHECK_DEADLOCK_RISK
*/
for (i = lockMethodTable->numLockModes; i > 0; i--)
{
- if (proclock->holding[i] > 0)
+ if (proclock->holdMask & LOCKBIT_ON(i))
{
if (i >= (int) lockmode)
break; /* safe: we have a lock >= req level */
- elog(LOG, "Deadlock risk: raising lock level"
+ elog(LOG, "deadlock risk: raising lock level"
" from %s to %s on object %u/%u/%u",
lock_mode_names[i], lock_mode_names[lockmode],
lock->tag.relId, lock->tag.dbId, lock->tag.objId.blkno);
Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
/*
- * If I already hold one or more locks of the requested type, just
- * grant myself another one without blocking.
- */
- if (proclock->holding[lockmode] > 0)
- {
- GrantLock(lock, proclock, lockmode);
- PROCLOCK_PRINT("LockAcquire: owning", proclock);
- LWLockRelease(masterLock);
- return TRUE;
- }
-
- /*
- * If this process (under any XID) is a proclock of the lock, also grant
+ * If this process (under any XID) is a holder of the lock, just grant
* myself another one without blocking.
*/
LockCountMyLocks(proclock->tag.lock, MyProc, myHolding);
if (myHolding[lockmode] > 0)
{
GrantLock(lock, proclock, lockmode);
+ GrantLockLocal(locallock, owner);
PROCLOCK_PRINT("LockAcquire: my other XID owning", proclock);
LWLockRelease(masterLock);
return TRUE;
{
/* No conflict with held or previously requested locks */
GrantLock(lock, proclock, lockmode);
+ GrantLockLocal(locallock, owner);
}
else
{
/*
* We can't acquire the lock immediately. If caller specified no
- * blocking, remove the proclock entry and return FALSE without
+ * blocking, remove useless table entries and return FALSE without
* waiting.
*/
if (dontWait)
{
- if (proclock->nHolding == 0)
+ if (proclock->holdMask == 0)
{
SHMQueueDelete(&proclock->lockLink);
SHMQueueDelete(&proclock->procLink);
- proclock = (PROCLOCK *) hash_search(proclockTable,
- (void *) proclock,
- HASH_REMOVE, NULL);
+ proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid],
+ (void *) &(proclock->tag),
+ HASH_REMOVE, NULL);
if (!proclock)
- elog(WARNING, "LockAcquire: remove proclock, table corrupted");
+ elog(WARNING, "proclock table corrupted");
}
else
- PROCLOCK_PRINT("LockAcquire: NHOLDING", proclock);
+ PROCLOCK_PRINT("LockAcquire: NOWAIT", proclock);
lock->nRequested--;
lock->requested[lockmode]--;
LOCK_PRINT("LockAcquire: conditional lock failed", lock, lockmode);
Assert((lock->nRequested > 0) && (lock->requested[lockmode] >= 0));
Assert(lock->nGranted <= lock->nRequested);
LWLockRelease(masterLock);
+ if (locallock->nLocks == 0)
+ RemoveLocalLock(locallock);
return FALSE;
}
* Construct bitmask of locks this process holds on this object.
*/
{
- int heldLocks = 0;
- int tmpMask;
+ LOCKMASK heldLocks = 0;
- for (i = 1, tmpMask = 2;
- i <= lockMethodTable->numLockModes;
- i++, tmpMask <<= 1)
+ for (i = 1; i <= lockMethodTable->numLockModes; i++)
{
if (myHolding[i] > 0)
- heldLocks |= tmpMask;
+ heldLocks |= LOCKBIT_ON(i);
}
MyProc->heldLocks = heldLocks;
}
/*
* Sleep till someone wakes me up.
*/
- status = WaitOnLock(lockmethod, lockmode, lock, proclock);
+ status = WaitOnLock(lockmethodid, locallock, owner);
/*
* NOTE: do not do any material change of state between here and
* Check the proclock entry status, in case something in the ipc
* communication doesn't work correctly.
*/
- if (!((proclock->nHolding > 0) && (proclock->holding[lockmode] > 0)))
+ if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
{
PROCLOCK_PRINT("LockAcquire: INCONSISTENT", proclock);
LOCK_PRINT("LockAcquire: INCONSISTENT", lock, lockmode);
return status == STATUS_OK;
}
+/*
+ * Subroutine to free a locallock entry
+ */
+static void
+RemoveLocalLock(LOCALLOCK *locallock)
+{
+ LOCKMETHODID lockmethodid = LOCALLOCK_LOCKMETHOD(*locallock);
+
+ pfree(locallock->lockOwners);
+ locallock->lockOwners = NULL;
+ locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash[lockmethodid],
+ (void *) &(locallock->tag),
+ HASH_REMOVE, NULL);
+ if (!locallock)
+ elog(WARNING, "locallock table corrupted");
+}
+
/*
* LockCheckConflicts -- test whether requested lock conflicts
* with those already granted
* known. If NULL is passed then these values will be computed internally.
*/
int
-LockCheckConflicts(LOCKMETHODTABLE *lockMethodTable,
+LockCheckConflicts(LockMethod lockMethodTable,
LOCKMODE lockmode,
LOCK *lock,
PROCLOCK *proclock,
int *myHolding) /* myHolding[] array or NULL */
{
int numLockModes = lockMethodTable->numLockModes;
- int bitmask;
- int i,
- tmpMask;
+ LOCKMASK bitmask;
+ int i;
int localHolding[MAX_LOCKMODES];
/*
/* Compute mask of lock types held by other processes */
bitmask = 0;
- tmpMask = 2;
- for (i = 1; i <= numLockModes; i++, tmpMask <<= 1)
+ for (i = 1; i <= numLockModes; i++)
{
if (lock->granted[i] != myHolding[i])
- bitmask |= tmpMask;
+ bitmask |= LOCKBIT_ON(i);
}
/*
static void
LockCountMyLocks(SHMEM_OFFSET lockOffset, PGPROC *proc, int *myHolding)
{
- SHM_QUEUE *procHolders = &(proc->procHolders);
+ SHM_QUEUE *procLocks = &(proc->procLocks);
PROCLOCK *proclock;
- int i;
MemSet(myHolding, 0, MAX_LOCKMODES * sizeof(int));
- proclock = (PROCLOCK *) SHMQueueNext(procHolders, procHolders,
- offsetof(PROCLOCK, procLink));
+ proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
+ offsetof(PROCLOCK, procLink));
while (proclock)
{
if (lockOffset == proclock->tag.lock)
{
+ LOCKMASK holdMask = proclock->holdMask;
+ int i;
+
for (i = 1; i < MAX_LOCKMODES; i++)
- myHolding[i] += proclock->holding[i];
+ {
+ if (holdMask & LOCKBIT_ON(i))
+ myHolding[i]++;
+ }
}
- proclock = (PROCLOCK *) SHMQueueNext(procHolders, &proclock->procLink,
+ proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->procLink,
offsetof(PROCLOCK, procLink));
}
}
* the lock request has been granted.
*
* NOTE: if proc was blocked, it also needs to be removed from the wait list
- * and have its waitLock/waitHolder fields cleared. That's not done here.
+ * and have its waitLock/waitProcLock fields cleared. That's not done here.
+ *
+ * NOTE: the lock grant also has to be recorded in the associated LOCALLOCK
+ * table entry; but since we may be awaking some other process, we can't do
+ * that here; it's done by GrantLockLocal, instead.
*/
void
GrantLock(LOCK *lock, PROCLOCK *proclock, LOCKMODE lockmode)
{
lock->nGranted++;
lock->granted[lockmode]++;
- lock->grantMask |= BITS_ON[lockmode];
+ lock->grantMask |= LOCKBIT_ON(lockmode);
if (lock->granted[lockmode] == lock->requested[lockmode])
- lock->waitMask &= BITS_OFF[lockmode];
+ lock->waitMask &= LOCKBIT_OFF(lockmode);
+ proclock->holdMask |= LOCKBIT_ON(lockmode);
LOCK_PRINT("GrantLock", lock, lockmode);
Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
Assert(lock->nGranted <= lock->nRequested);
- proclock->holding[lockmode]++;
- proclock->nHolding++;
- Assert((proclock->nHolding > 0) && (proclock->holding[lockmode] > 0));
+}
+
+/*
+ * UnGrantLock -- opposite of GrantLock.
+ *
+ * Updates the lock and proclock data structures to show that the lock
+ * is no longer held nor requested by the current holder.
+ *
+ * Returns true if there were any waiters waiting on the lock that
+ * should now be woken up with ProcLockWakeup.
+ */
+static bool
+UnGrantLock(LOCK *lock, LOCKMODE lockmode,
+ PROCLOCK *proclock, LockMethod lockMethodTable)
+{
+ bool wakeupNeeded = false;
+
+ Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
+ Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
+ Assert(lock->nGranted <= lock->nRequested);
+
+ /*
+ * fix the general lock stats
+ */
+ lock->nRequested--;
+ lock->requested[lockmode]--;
+ lock->nGranted--;
+ lock->granted[lockmode]--;
+
+ if (lock->granted[lockmode] == 0)
+ {
+ /* change the conflict mask. No more of this lock type. */
+ lock->grantMask &= LOCKBIT_OFF(lockmode);
+ }
+
+ LOCK_PRINT("UnGrantLock: updated", lock, lockmode);
+
+ /*
+ * We need only run ProcLockWakeup if the released lock conflicts with
+ * at least one of the lock types requested by waiter(s). Otherwise
+ * whatever conflict made them wait must still exist. NOTE: before
+ * MVCC, we could skip wakeup if lock->granted[lockmode] was still
+ * positive. But that's not true anymore, because the remaining
+ * granted locks might belong to some waiter, who could now be
+ * awakened because he doesn't conflict with his own locks.
+ */
+ if (lockMethodTable->conflictTab[lockmode] & lock->waitMask)
+ wakeupNeeded = true;
+
+ /*
+ * Now fix the per-proclock state.
+ */
+ proclock->holdMask &= LOCKBIT_OFF(lockmode);
+ PROCLOCK_PRINT("UnGrantLock: updated", proclock);
+
+ return wakeupNeeded;
+}
+
+/*
+ * GrantLockLocal -- update the locallock data structures to show
+ * the lock request has been granted.
+ *
+ * We expect that LockAcquire made sure there is room to add a new
+ * ResourceOwner entry.
+ */
+static void
+GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner)
+{
+ LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
+ int i;
+
+ Assert(locallock->numLockOwners < locallock->maxLockOwners);
+ /* Count the total */
+ locallock->nLocks++;
+ /* Count the per-owner lock */
+ for (i = 0; i < locallock->numLockOwners; i++)
+ {
+ if (lockOwners[i].owner == owner)
+ {
+ lockOwners[i].nLocks++;
+ return;
+ }
+ }
+ lockOwners[i].owner = owner;
+ lockOwners[i].nLocks = 1;
+ locallock->numLockOwners++;
+}
+
+/*
+ * GrantAwaitedLock -- call GrantLockLocal for the lock we are doing
+ * WaitOnLock on.
+ *
+ * proc.c needs this for the case where we are booted off the lock by
+ * timeout, but discover that someone granted us the lock anyway.
+ *
+ * We could just export GrantLockLocal, but that would require including
+ * resowner.h in lock.h, which creates circularity.
+ */
+void
+GrantAwaitedLock(void)
+{
+ GrantLockLocal(awaitedLock, awaitedOwner);
}
/*
* The locktable's masterLock must be held at entry.
*/
static int
-WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode,
- LOCK *lock, PROCLOCK *proclock)
+WaitOnLock(LOCKMETHODID lockmethodid, LOCALLOCK *locallock,
+ ResourceOwner owner)
{
- LOCKMETHODTABLE *lockMethodTable = LockMethodTable[lockmethod];
+ LockMethod lockMethodTable = LockMethods[lockmethodid];
char *new_status,
*old_status;
+ size_t len;
- Assert(lockmethod < NumLockMethods);
+ Assert(lockmethodid < NumLockMethods);
- LOCK_PRINT("WaitOnLock: sleeping on lock", lock, lockmode);
+ LOCK_PRINT("WaitOnLock: sleeping on lock",
+ locallock->lock, locallock->tag.mode);
old_status = pstrdup(get_ps_display());
- new_status = (char *) palloc(strlen(old_status) + 10);
- strcpy(new_status, old_status);
- strcat(new_status, " waiting");
+ len = strlen(old_status);
+ new_status = (char *) palloc(len + 8 + 1);
+ memcpy(new_status, old_status, len);
+ strcpy(new_status + len, " waiting");
set_ps_display(new_status);
+ awaitedLock = locallock;
+ awaitedOwner = owner;
+
/*
* NOTE: Think not to put any shared-state cleanup after the call to
* ProcSleep, in either the normal or failure path. The lock state
*/
if (ProcSleep(lockMethodTable,
- lockmode,
- lock,
- proclock) != STATUS_OK)
+ locallock->tag.mode,
+ locallock->lock,
+ locallock->proclock) != STATUS_OK)
{
/*
* We failed as a result of a deadlock, see CheckDeadLock(). Quit
- * now. Removal of the proclock and lock objects, if no longer
- * needed, will happen in xact cleanup (see above for motivation).
+ * now.
*/
- LOCK_PRINT("WaitOnLock: aborting on lock", lock, lockmode);
+ awaitedLock = NULL;
+ LOCK_PRINT("WaitOnLock: aborting on lock",
+ locallock->lock, locallock->tag.mode);
LWLockRelease(lockMethodTable->masterLock);
+
/*
- * Now that we aren't holding the LockMgrLock, print details about
- * the detected deadlock. We didn't want to do this before because
- * sending elog messages to the client while holding the shared lock
- * is bad for concurrency.
+ * Now that we aren't holding the LockMgrLock, we can give an
+ * error report including details about the detected deadlock.
*/
DeadLockReport();
- elog(ERROR, "deadlock detected");
/* not reached */
}
+ awaitedLock = NULL;
+
set_ps_display(old_status);
pfree(old_status);
pfree(new_status);
- LOCK_PRINT("WaitOnLock: wakeup on lock", lock, lockmode);
+ LOCK_PRINT("WaitOnLock: wakeup on lock",
+ locallock->lock, locallock->tag.mode);
return STATUS_OK;
}
*
* Locktable lock must be held by caller.
*
- * NB: this does not remove the process' proclock object, nor the lock object,
- * even though their counts might now have gone to zero. That will happen
- * during a subsequent LockReleaseAll call, which we expect will happen
- * during transaction cleanup. (Removal of a proc from its wait queue by
- * this routine can only happen if we are aborting the transaction.)
+ * NB: this does not clean up any locallock object that may exist for the lock.
*/
void
RemoveFromWaitQueue(PGPROC *proc)
{
LOCK *waitLock = proc->waitLock;
+ PROCLOCK *proclock = proc->waitProcLock;
LOCKMODE lockmode = proc->waitLockMode;
+ LOCKMETHODID lockmethodid = LOCK_LOCKMETHOD(*waitLock);
/* Make sure proc is waiting */
Assert(proc->links.next != INVALID_OFFSET);
Assert(waitLock);
Assert(waitLock->waitProcs.size > 0);
+ Assert(0 < lockmethodid && lockmethodid < NumLockMethods);
/* Remove proc from lock's wait queue */
SHMQueueDelete(&(proc->links));
waitLock->requested[lockmode]--;
/* don't forget to clear waitMask bit if appropriate */
if (waitLock->granted[lockmode] == waitLock->requested[lockmode])
- waitLock->waitMask &= BITS_OFF[lockmode];
+ waitLock->waitMask &= LOCKBIT_OFF(lockmode);
/* Clean up the proc's own state */
proc->waitLock = NULL;
- proc->waitHolder = NULL;
+ proc->waitProcLock = NULL;
+
+ /*
+ * Delete the proclock immediately if it represents no already-held locks.
+ * This must happen now because if the owner of the lock decides to release
+ * it, and the requested/granted counts then go to zero, LockRelease
+ * expects there to be no remaining proclocks.
+ */
+ if (proclock->holdMask == 0)
+ {
+ PROCLOCK_PRINT("RemoveFromWaitQueue: deleting proclock", proclock);
+ SHMQueueDelete(&proclock->lockLink);
+ SHMQueueDelete(&proclock->procLink);
+ proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid],
+ (void *) &(proclock->tag),
+ HASH_REMOVE, NULL);
+ if (!proclock)
+ elog(WARNING, "proclock table corrupted");
+ }
+
+ /*
+ * There should still be some requests for the lock ... else what were
+ * we waiting for? Therefore no need to delete the lock object.
+ */
+ Assert(waitLock->nRequested > 0);
/* See if any other waiters for the lock can be woken up now */
- ProcLockWakeup(GetLocksMethodTable(waitLock), waitLock);
+ ProcLockWakeup(LockMethods[lockmethodid], waitLock);
}
/*
- * LockRelease -- look up 'locktag' in lock table 'lockmethod' and
+ * LockRelease -- look up 'locktag' in lock table 'lockmethodid' and
* release one 'lockmode' lock on it.
*
* Side Effects: find any waiting processes that are now wakable,
* come along and request the lock.)
*/
bool
-LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag,
+LockRelease(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
TransactionId xid, LOCKMODE lockmode)
{
+ LOCALLOCKTAG localtag;
+ LOCALLOCK *locallock;
LOCK *lock;
- LWLockId masterLock;
- LOCKMETHODTABLE *lockMethodTable;
PROCLOCK *proclock;
- PROCLOCKTAG proclocktag;
- HTAB *proclockTable;
- bool wakeupNeeded = false;
+ LWLockId masterLock;
+ LockMethod lockMethodTable;
+ bool wakeupNeeded;
#ifdef LOCK_DEBUG
- if (lockmethod == USER_LOCKMETHOD && Trace_userlocks)
- elog(LOG, "LockRelease: user lock tag [%u] %d", locktag->objId.blkno, lockmode);
+ if (Trace_userlocks && lockmethodid == USER_LOCKMETHOD)
+ elog(LOG, "LockRelease: user lock [%u,%u] %s",
+ locktag->locktag_field1, locktag->locktag_field2,
+ lock_mode_names[lockmode]);
#endif
- /* ???????? This must be changed when short term locks will be used */
- locktag->lockmethod = lockmethod;
+ /* ugly */
+ locktag->locktag_lockmethodid = lockmethodid;
- Assert(lockmethod < NumLockMethods);
- lockMethodTable = LockMethodTable[lockmethod];
+ Assert(lockmethodid < NumLockMethods);
+ lockMethodTable = LockMethods[lockmethodid];
if (!lockMethodTable)
{
elog(WARNING, "lockMethodTable is null in LockRelease");
return FALSE;
}
- masterLock = lockMethodTable->masterLock;
- LWLockAcquire(masterLock, LW_EXCLUSIVE);
-
/*
- * Find a lock with this tag
+ * Find the LOCALLOCK entry for this lock and lockmode
*/
- Assert(lockMethodTable->lockHash->hash == tag_hash);
- lock = (LOCK *) hash_search(lockMethodTable->lockHash,
- (void *) locktag,
- HASH_FIND, NULL);
+ MemSet(&localtag, 0, sizeof(localtag)); /* must clear padding */
+ localtag.lock = *locktag;
+ localtag.xid = xid;
+ localtag.mode = lockmode;
+
+ locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash[lockmethodid],
+ (void *) &localtag,
+ HASH_FIND, NULL);
/*
* let the caller print its own error message, too. Do not
- * elog(ERROR).
+ * ereport(ERROR).
*/
- if (!lock)
+ if (!locallock || locallock->nLocks <= 0)
{
- LWLockRelease(masterLock);
- elog(WARNING, "LockRelease: no such lock");
+ elog(WARNING, "you don't own a lock of type %s",
+ lock_mode_names[lockmode]);
return FALSE;
}
- LOCK_PRINT("LockRelease: found", lock, lockmode);
/*
- * Find the proclock entry for this proclock.
+ * Decrease the count for the resource owner.
*/
- MemSet(&proclocktag, 0, sizeof(PROCLOCKTAG)); /* must clear padding,
- * needed */
- proclocktag.lock = MAKE_OFFSET(lock);
- proclocktag.proc = MAKE_OFFSET(MyProc);
- TransactionIdStore(xid, &proclocktag.xid);
-
- proclockTable = lockMethodTable->proclockHash;
- proclock = (PROCLOCK *) hash_search(proclockTable,
- (void *) &proclocktag,
- HASH_FIND_SAVE, NULL);
- if (!proclock)
{
- LWLockRelease(masterLock);
-#ifdef USER_LOCKS
- if (lockmethod == USER_LOCKMETHOD)
- elog(WARNING, "LockRelease: no lock with this tag");
+ LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
+ ResourceOwner owner;
+ int i;
+
+ /* Session locks and user locks are not transactional */
+ if (xid != InvalidTransactionId &&
+ lockmethodid == DEFAULT_LOCKMETHOD)
+ owner = CurrentResourceOwner;
else
-#endif
- elog(WARNING, "LockRelease: proclock table corrupted");
- return FALSE;
+ owner = NULL;
+
+ for (i = locallock->numLockOwners - 1; i >= 0; i--)
+ {
+ if (lockOwners[i].owner == owner)
+ {
+ Assert(lockOwners[i].nLocks > 0);
+ if (--lockOwners[i].nLocks == 0)
+ {
+ /* compact out unused slot */
+ locallock->numLockOwners--;
+ if (i < locallock->numLockOwners)
+ lockOwners[i] = lockOwners[locallock->numLockOwners];
+ }
+ break;
+ }
+ }
+ if (i < 0)
+ {
+ /* don't release a lock belonging to another owner */
+ elog(WARNING, "you don't own a lock of type %s",
+ lock_mode_names[lockmode]);
+ return FALSE;
+ }
}
- PROCLOCK_PRINT("LockRelease: found", proclock);
/*
- * Check that we are actually holding a lock of the type we want to
- * release.
+ * Decrease the total local count. If we're still holding the lock,
+ * we're done.
*/
- if (!(proclock->holding[lockmode] > 0))
- {
- PROCLOCK_PRINT("LockRelease: WRONGTYPE", proclock);
- Assert(proclock->holding[lockmode] >= 0);
- LWLockRelease(masterLock);
- elog(WARNING, "LockRelease: you don't own a lock of type %s",
- lock_mode_names[lockmode]);
- return FALSE;
- }
- Assert(proclock->nHolding > 0);
- Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
- Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
- Assert(lock->nGranted <= lock->nRequested);
+ locallock->nLocks--;
+
+ if (locallock->nLocks > 0)
+ return TRUE;
/*
- * fix the general lock stats
+ * Otherwise we've got to mess with the shared lock table.
*/
- lock->nRequested--;
- lock->requested[lockmode]--;
- lock->nGranted--;
- lock->granted[lockmode]--;
-
- if (lock->granted[lockmode] == 0)
- {
- /* change the conflict mask. No more of this lock type. */
- lock->grantMask &= BITS_OFF[lockmode];
- }
+ masterLock = lockMethodTable->masterLock;
- LOCK_PRINT("LockRelease: updated", lock, lockmode);
- Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
- Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
- Assert(lock->nGranted <= lock->nRequested);
+ LWLockAcquire(masterLock, LW_EXCLUSIVE);
/*
- * We need only run ProcLockWakeup if the released lock conflicts with
- * at least one of the lock types requested by waiter(s). Otherwise
- * whatever conflict made them wait must still exist. NOTE: before
- * MVCC, we could skip wakeup if lock->granted[lockmode] was still
- * positive. But that's not true anymore, because the remaining
- * granted locks might belong to some waiter, who could now be
- * awakened because he doesn't conflict with his own locks.
+ * We don't need to re-find the lock or proclock, since we kept their
+ * addresses in the locallock table, and they couldn't have been
+ * removed while we were holding a lock on them.
*/
- if (lockMethodTable->conflictTab[lockmode] & lock->waitMask)
- wakeupNeeded = true;
+ lock = locallock->lock;
+ LOCK_PRINT("LockRelease: found", lock, lockmode);
+ proclock = locallock->proclock;
+ PROCLOCK_PRINT("LockRelease: found", proclock);
- if (lock->nRequested == 0)
+ /*
+ * Double-check that we are actually holding a lock of the type we
+ * want to release.
+ */
+ if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
{
- /*
- * if there's no one waiting in the queue, we just released the
- * last lock on this object. Delete it from the lock table.
- */
- Assert(lockMethodTable->lockHash->hash == tag_hash);
- lock = (LOCK *) hash_search(lockMethodTable->lockHash,
- (void *) &(lock->tag),
- HASH_REMOVE,
- NULL);
- if (!lock)
- {
- LWLockRelease(masterLock);
- elog(WARNING, "LockRelease: remove lock, table corrupted");
- return FALSE;
- }
- wakeupNeeded = false; /* should be false, but make sure */
+ PROCLOCK_PRINT("LockRelease: WRONGTYPE", proclock);
+ LWLockRelease(masterLock);
+ elog(WARNING, "you don't own a lock of type %s",
+ lock_mode_names[lockmode]);
+ RemoveLocalLock(locallock);
+ return FALSE;
}
- /*
- * Now fix the per-proclock lock stats.
- */
- proclock->holding[lockmode]--;
- proclock->nHolding--;
- PROCLOCK_PRINT("LockRelease: updated", proclock);
- Assert((proclock->nHolding >= 0) && (proclock->holding[lockmode] >= 0));
+ wakeupNeeded = UnGrantLock(lock, lockmode, proclock, lockMethodTable);
/*
* If this was my last hold on this lock, delete my entry in the
* proclock table.
*/
- if (proclock->nHolding == 0)
+ if (proclock->holdMask == 0)
{
- PROCLOCK_PRINT("LockRelease: deleting", proclock);
+ PROCLOCK_PRINT("LockRelease: deleting proclock", proclock);
SHMQueueDelete(&proclock->lockLink);
SHMQueueDelete(&proclock->procLink);
- proclock = (PROCLOCK *) hash_search(proclockTable,
- (void *) &proclock,
- HASH_REMOVE_SAVED, NULL);
+ proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid],
+ (void *) &(proclock->tag),
+ HASH_REMOVE, NULL);
if (!proclock)
{
LWLockRelease(masterLock);
- elog(WARNING, "LockRelease: remove proclock, table corrupted");
+ elog(WARNING, "proclock table corrupted");
+ RemoveLocalLock(locallock);
return FALSE;
}
}
- /*
- * Wake up waiters if needed.
- */
- if (wakeupNeeded)
- ProcLockWakeup(lockMethodTable, lock);
+ if (lock->nRequested == 0)
+ {
+ /*
+ * We've just released the last lock, so garbage-collect the lock
+ * object.
+ */
+ LOCK_PRINT("LockRelease: deleting lock", lock, lockmode);
+ Assert(SHMQueueEmpty(&(lock->procLocks)));
+ lock = (LOCK *) hash_search(LockMethodLockHash[lockmethodid],
+ (void *) &(lock->tag),
+ HASH_REMOVE, NULL);
+ if (!lock)
+ {
+ LWLockRelease(masterLock);
+ elog(WARNING, "lock table corrupted");
+ RemoveLocalLock(locallock);
+ return FALSE;
+ }
+ }
+ else
+ {
+ /*
+ * Wake up waiters if needed.
+ */
+ if (wakeupNeeded)
+ ProcLockWakeup(lockMethodTable, lock);
+ }
LWLockRelease(masterLock);
+
+ RemoveLocalLock(locallock);
return TRUE;
}
/*
- * LockReleaseAll -- Release all locks in a process's lock list.
+ * LockReleaseAll -- Release all locks of the specified lock method that
+ * are held by the current process.
*
- * Well, not really *all* locks.
+ * Well, not necessarily *all* locks. The available behaviors are:
*
- * If 'allxids' is TRUE, all locks of the specified lock method are
- * released, regardless of transaction affiliation.
+ * allxids == true: release all locks regardless of transaction
+ * affiliation.
*
- * If 'allxids' is FALSE, all locks of the specified lock method and
- * specified XID are released.
+ * allxids == false: release all locks with Xid != 0
+ * (zero is the Xid used for "session" locks).
*/
bool
-LockReleaseAll(LOCKMETHOD lockmethod, PGPROC *proc,
- bool allxids, TransactionId xid)
+LockReleaseAll(LOCKMETHODID lockmethodid, bool allxids)
{
- SHM_QUEUE *procHolders = &(proc->procHolders);
- PROCLOCK *proclock;
- PROCLOCK *nextHolder;
+ HASH_SEQ_STATUS status;
+ SHM_QUEUE *procLocks = &(MyProc->procLocks);
LWLockId masterLock;
- LOCKMETHODTABLE *lockMethodTable;
+ LockMethod lockMethodTable;
int i,
numLockModes;
+ LOCALLOCK *locallock;
+ PROCLOCK *proclock;
LOCK *lock;
#ifdef LOCK_DEBUG
- if (lockmethod == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
- elog(LOG, "LockReleaseAll: lockmethod=%d, pid=%d",
- lockmethod, proc->pid);
+ if (lockmethodid == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
+ elog(LOG, "LockReleaseAll: lockmethod=%d", lockmethodid);
#endif
- Assert(lockmethod < NumLockMethods);
- lockMethodTable = LockMethodTable[lockmethod];
+ Assert(lockmethodid < NumLockMethods);
+ lockMethodTable = LockMethods[lockmethodid];
if (!lockMethodTable)
{
- elog(WARNING, "LockReleaseAll: bad lockmethod %d", lockmethod);
+ elog(WARNING, "bad lock method: %d", lockmethodid);
return FALSE;
}
numLockModes = lockMethodTable->numLockModes;
masterLock = lockMethodTable->masterLock;
+ /*
+ * First we run through the locallock table and get rid of unwanted
+ * entries, then we scan the process's proclocks and get rid of those.
+ * We do this separately because we may have multiple locallock
+ * entries pointing to the same proclock, and we daren't end up with
+ * any dangling pointers.
+ */
+ hash_seq_init(&status, LockMethodLocalHash[lockmethodid]);
+
+ while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
+ {
+ if (locallock->proclock == NULL || locallock->lock == NULL)
+ {
+ /*
+ * We must've run out of shared memory while trying to set up
+ * this lock. Just forget the local entry.
+ */
+ Assert(locallock->nLocks == 0);
+ RemoveLocalLock(locallock);
+ continue;
+ }
+
+ /* Ignore items that are not of the lockmethod to be removed */
+ if (LOCALLOCK_LOCKMETHOD(*locallock) != lockmethodid)
+ continue;
+
+ /*
+ * Ignore locks with Xid=0 unless we are asked to release all
+ * locks
+ */
+ if (TransactionIdEquals(locallock->tag.xid, InvalidTransactionId)
+ && !allxids)
+ continue;
+
+ RemoveLocalLock(locallock);
+ }
+
LWLockAcquire(masterLock, LW_EXCLUSIVE);
- proclock = (PROCLOCK *) SHMQueueNext(procHolders, procHolders,
- offsetof(PROCLOCK, procLink));
+ proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
+ offsetof(PROCLOCK, procLink));
while (proclock)
{
bool wakeupNeeded = false;
+ PROCLOCK *nextHolder;
/* Get link first, since we may unlink/delete this proclock */
- nextHolder = (PROCLOCK *) SHMQueueNext(procHolders, &proclock->procLink,
+ nextHolder = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->procLink,
offsetof(PROCLOCK, procLink));
- Assert(proclock->tag.proc == MAKE_OFFSET(proc));
+ Assert(proclock->tag.proc == MAKE_OFFSET(MyProc));
lock = (LOCK *) MAKE_PTR(proclock->tag.lock);
/* Ignore items that are not of the lockmethod to be removed */
- if (LOCK_LOCKMETHOD(*lock) != lockmethod)
+ if (LOCK_LOCKMETHOD(*lock) != lockmethodid)
goto next_item;
- /* If not allxids, ignore items that are of the wrong xid */
- if (!allxids && !TransactionIdEquals(xid, proclock->tag.xid))
+ /*
+ * Ignore locks with Xid=0 unless we are asked to release all
+ * locks
+ */
+ if (TransactionIdEquals(proclock->tag.xid, InvalidTransactionId)
+ && !allxids)
goto next_item;
PROCLOCK_PRINT("LockReleaseAll", proclock);
Assert(lock->nRequested >= 0);
Assert(lock->nGranted >= 0);
Assert(lock->nGranted <= lock->nRequested);
- Assert(proclock->nHolding >= 0);
- Assert(proclock->nHolding <= lock->nRequested);
+ Assert((proclock->holdMask & ~lock->grantMask) == 0);
/*
* fix the general lock stats
*/
- if (lock->nRequested != proclock->nHolding)
- {
- for (i = 1; i <= numLockModes; i++)
- {
- Assert(proclock->holding[i] >= 0);
- if (proclock->holding[i] > 0)
- {
- lock->requested[i] -= proclock->holding[i];
- lock->granted[i] -= proclock->holding[i];
- Assert(lock->requested[i] >= 0 && lock->granted[i] >= 0);
- if (lock->granted[i] == 0)
- lock->grantMask &= BITS_OFF[i];
-
- /*
- * Read comments in LockRelease
- */
- if (!wakeupNeeded &&
- lockMethodTable->conflictTab[i] & lock->waitMask)
- wakeupNeeded = true;
- }
- }
- lock->nRequested -= proclock->nHolding;
- lock->nGranted -= proclock->nHolding;
- Assert((lock->nRequested >= 0) && (lock->nGranted >= 0));
- Assert(lock->nGranted <= lock->nRequested);
- }
- else
+ if (proclock->holdMask)
{
- /*
- * This proclock accounts for all the requested locks on the
- * object, so we can be lazy and just zero things out.
- */
- lock->nRequested = 0;
- lock->nGranted = 0;
- /* Fix the lock status, just for next LOCK_PRINT message. */
for (i = 1; i <= numLockModes; i++)
{
- Assert(lock->requested[i] == lock->granted[i]);
- lock->requested[i] = lock->granted[i] = 0;
+ if (proclock->holdMask & LOCKBIT_ON(i))
+ wakeupNeeded |= UnGrantLock(lock, i, proclock,
+ lockMethodTable);
}
}
+ Assert((lock->nRequested >= 0) && (lock->nGranted >= 0));
+ Assert(lock->nGranted <= lock->nRequested);
LOCK_PRINT("LockReleaseAll: updated", lock, 0);
PROCLOCK_PRINT("LockReleaseAll: deleting", proclock);
/*
* remove the proclock entry from the hashtable
*/
- proclock = (PROCLOCK *) hash_search(lockMethodTable->proclockHash,
- (void *) proclock,
- HASH_REMOVE,
- NULL);
+ proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid],
+ (void *) &(proclock->tag),
+ HASH_REMOVE,
+ NULL);
if (!proclock)
{
LWLockRelease(masterLock);
- elog(WARNING, "LockReleaseAll: proclock table corrupted");
+ elog(WARNING, "proclock table corrupted");
return FALSE;
}
* lock object.
*/
LOCK_PRINT("LockReleaseAll: deleting", lock, 0);
- Assert(lockMethodTable->lockHash->hash == tag_hash);
- lock = (LOCK *) hash_search(lockMethodTable->lockHash,
+ Assert(SHMQueueEmpty(&(lock->procLocks)));
+ lock = (LOCK *) hash_search(LockMethodLockHash[lockmethodid],
(void *) &(lock->tag),
HASH_REMOVE, NULL);
if (!lock)
{
LWLockRelease(masterLock);
- elog(WARNING, "LockReleaseAll: cannot remove lock from HTAB");
+ elog(WARNING, "lock table corrupted");
return FALSE;
}
}
LWLockRelease(masterLock);
#ifdef LOCK_DEBUG
- if (lockmethod == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
- elog(LOG, "LockReleaseAll: done");
+ if (lockmethodid == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
+ elog(LOG, "LockReleaseAll done");
#endif
return TRUE;
}
+/*
+ * LockReleaseCurrentOwner
+ * Release all locks belonging to CurrentResourceOwner
+ *
+ * Only DEFAULT_LOCKMETHOD locks can belong to a resource owner.
+ */
+void
+LockReleaseCurrentOwner(void)
+{
+ HASH_SEQ_STATUS status;
+ LOCALLOCK *locallock;
+ LOCALLOCKOWNER *lockOwners;
+ int i;
+
+ hash_seq_init(&status, LockMethodLocalHash[DEFAULT_LOCKMETHOD]);
+
+ while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
+ {
+ /* Ignore items that must be nontransactional */
+ if (LOCALLOCK_LOCKMETHOD(*locallock) != DEFAULT_LOCKMETHOD)
+ continue;
+ if (TransactionIdEquals(locallock->tag.xid, InvalidTransactionId))
+ continue;
+
+ /* Scan to see if there are any locks belonging to current owner */
+ lockOwners = locallock->lockOwners;
+ for (i = locallock->numLockOwners - 1; i >= 0; i--)
+ {
+ if (lockOwners[i].owner == CurrentResourceOwner)
+ {
+ Assert(lockOwners[i].nLocks > 0);
+ if (lockOwners[i].nLocks < locallock->nLocks)
+ {
+ /*
+ * We will still hold this lock after forgetting this
+ * ResourceOwner.
+ */
+ locallock->nLocks -= lockOwners[i].nLocks;
+ /* compact out unused slot */
+ locallock->numLockOwners--;
+ if (i < locallock->numLockOwners)
+ lockOwners[i] = lockOwners[locallock->numLockOwners];
+ }
+ else
+ {
+ Assert(lockOwners[i].nLocks == locallock->nLocks);
+ /* We want to call LockRelease just once */
+ lockOwners[i].nLocks = 1;
+ locallock->nLocks = 1;
+ if (!LockRelease(DEFAULT_LOCKMETHOD,
+ &locallock->tag.lock,
+ locallock->tag.xid,
+ locallock->tag.mode))
+ elog(WARNING, "LockReleaseCurrentOwner: failed??");
+ }
+ break;
+ }
+ }
+ }
+}
+
+/*
+ * LockReassignCurrentOwner
+ * Reassign all locks belonging to CurrentResourceOwner to belong
+ * to its parent resource owner
+ */
+void
+LockReassignCurrentOwner(void)
+{
+ ResourceOwner parent = ResourceOwnerGetParent(CurrentResourceOwner);
+ HASH_SEQ_STATUS status;
+ LOCALLOCK *locallock;
+ LOCALLOCKOWNER *lockOwners;
+
+ Assert(parent != NULL);
+
+ hash_seq_init(&status, LockMethodLocalHash[DEFAULT_LOCKMETHOD]);
+
+ while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
+ {
+ int i;
+ int ic = -1;
+ int ip = -1;
+
+ /* Ignore items that must be nontransactional */
+ if (LOCALLOCK_LOCKMETHOD(*locallock) != DEFAULT_LOCKMETHOD)
+ continue;
+ if (TransactionIdEquals(locallock->tag.xid, InvalidTransactionId))
+ continue;
+
+ /*
+ * Scan to see if there are any locks belonging to current owner
+ * or its parent
+ */
+ lockOwners = locallock->lockOwners;
+ for (i = locallock->numLockOwners - 1; i >= 0; i--)
+ {
+ if (lockOwners[i].owner == CurrentResourceOwner)
+ ic = i;
+ else if (lockOwners[i].owner == parent)
+ ip = i;
+ }
+
+ if (ic < 0)
+ continue; /* no current locks */
+
+ if (ip < 0)
+ {
+ /* Parent has no slot, so just give it child's slot */
+ lockOwners[ic].owner = parent;
+ }
+ else
+ {
+ /* Merge child's count with parent's */
+ lockOwners[ip].nLocks += lockOwners[ic].nLocks;
+ /* compact out unused slot */
+ locallock->numLockOwners--;
+ if (ic < locallock->numLockOwners)
+ lockOwners[ic] = lockOwners[locallock->numLockOwners];
+ }
+ }
+}
+
+
+/*
+ * Estimate shared-memory space used for lock tables
+ */
int
LockShmemSize(int maxBackends)
{
int size = 0;
long max_table_size = NLOCKENTS(maxBackends);
- size += MAXALIGN(sizeof(PROC_HDR)); /* ProcGlobal */
- size += maxBackends * MAXALIGN(sizeof(PGPROC)); /* each MyProc */
- size += MAX_LOCK_METHODS * MAXALIGN(sizeof(LOCKMETHODTABLE)); /* each lockMethodTable */
+ /* lock method headers */
+ size += MAX_LOCK_METHODS * MAXALIGN(sizeof(LockMethodData));
/* lockHash table */
size += hash_estimate_size(max_table_size, sizeof(LOCK));
size += hash_estimate_size(max_table_size, sizeof(PROCLOCK));
/*
+ * Note we count only one pair of hash tables, since the userlocks
+ * table actually overlays the main one.
+ *
* Since the lockHash entry count above is only an estimate, add 10%
* safety margin.
*/
LWLockAcquire(LockMgrLock, LW_EXCLUSIVE);
- proclockTable = LockMethodTable[DEFAULT_LOCKMETHOD]->proclockHash;
+ proclockTable = LockMethodProcLockHash[DEFAULT_LOCKMETHOD];
data->nelements = i = proclockTable->hctl->nentries;
- if (i == 0)
- i = 1; /* avoid palloc(0) if empty table */
-
data->proclockaddrs = (SHMEM_OFFSET *) palloc(sizeof(SHMEM_OFFSET) * i);
data->proclocks = (PROCLOCK *) palloc(sizeof(PROCLOCK) * i);
data->procs = (PGPROC *) palloc(sizeof(PGPROC) * i);
#ifdef LOCK_DEBUG
/*
- * Dump all locks in the proc->procHolders list.
+ * Dump all locks in the MyProc->procLocks list.
*
* Must have already acquired the masterLock.
*/
DumpLocks(void)
{
PGPROC *proc;
- SHM_QUEUE *procHolders;
+ SHM_QUEUE *procLocks;
PROCLOCK *proclock;
LOCK *lock;
- int lockmethod = DEFAULT_LOCKMETHOD;
- LOCKMETHODTABLE *lockMethodTable;
+ int lockmethodid = DEFAULT_LOCKMETHOD;
+ LockMethod lockMethodTable;
proc = MyProc;
if (proc == NULL)
return;
- procHolders = &proc->procHolders;
+ procLocks = &proc->procLocks;
- Assert(lockmethod < NumLockMethods);
- lockMethodTable = LockMethodTable[lockmethod];
+ Assert(lockmethodid < NumLockMethods);
+ lockMethodTable = LockMethods[lockmethodid];
if (!lockMethodTable)
return;
if (proc->waitLock)
LOCK_PRINT("DumpLocks: waiting on", proc->waitLock, 0);
- proclock = (PROCLOCK *) SHMQueueNext(procHolders, procHolders,
- offsetof(PROCLOCK, procLink));
+ proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
+ offsetof(PROCLOCK, procLink));
while (proclock)
{
PROCLOCK_PRINT("DumpLocks", proclock);
LOCK_PRINT("DumpLocks", lock, 0);
- proclock = (PROCLOCK *) SHMQueueNext(procHolders, &proclock->procLink,
+ proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->procLink,
offsetof(PROCLOCK, procLink));
}
}
PGPROC *proc;
PROCLOCK *proclock;
LOCK *lock;
- int lockmethod = DEFAULT_LOCKMETHOD;
- LOCKMETHODTABLE *lockMethodTable;
+ int lockmethodid = DEFAULT_LOCKMETHOD;
+ LockMethod lockMethodTable;
HTAB *proclockTable;
HASH_SEQ_STATUS status;
if (proc == NULL)
return;
- Assert(lockmethod < NumLockMethods);
- lockMethodTable = LockMethodTable[lockmethod];
+ Assert(lockmethodid < NumLockMethods);
+ lockMethodTable = LockMethods[lockmethodid];
if (!lockMethodTable)
return;
- proclockTable = lockMethodTable->proclockHash;
+ proclockTable = LockMethodProcLockHash[lockmethodid];
if (proc->waitLock)
LOCK_PRINT("DumpAllLocks: waiting on", proc->waitLock, 0);