X-Git-Url: https://granicus.if.org/sourcecode?a=blobdiff_plain;f=src%2Fbackend%2Fstorage%2Flmgr%2Flock.c;h=576fc205299eebe48047cab9f3dcdf5169ab35fe;hb=3a694bb0a16fea1662f1ffd31506a72effdd4a93;hp=ba84f525c7b14f955073adc5f48144df071755b7;hpb=f7387474941036c212fd8084cfa546931a227144;p=postgresql diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c index ba84f525c7..576fc20529 100644 --- a/src/backend/storage/lmgr/lock.c +++ b/src/backend/storage/lmgr/lock.c @@ -3,12 +3,12 @@ * lock.c * POSTGRES low-level lock mechanism * - * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group + * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.97 2001/09/29 21:35:14 momjian Exp $ + * $PostgreSQL: pgsql/src/backend/storage/lmgr/lock.c,v 1.150 2005/04/29 22:28:24 tgl Exp $ * * NOTES * Outside modules can create a lock table and acquire/release @@ -23,36 +23,48 @@ * Interface: * * LockAcquire(), LockRelease(), LockMethodTableInit(), - * LockMethodTableRename(), LockReleaseAll, + * LockMethodTableRename(), LockReleaseAll(), * LockCheckConflicts(), GrantLock() * *------------------------------------------------------------------------- */ #include "postgres.h" -#include -#include #include +#include #include "access/xact.h" #include "miscadmin.h" #include "storage/proc.h" #include "utils/memutils.h" #include "utils/ps_status.h" +#include "utils/resowner.h" /* This configuration variable is used to set the lock table size */ -int max_locks_per_xact; /* set by guc.c */ +int max_locks_per_xact; /* set by guc.c */ #define NLOCKENTS(maxBackends) (max_locks_per_xact * (maxBackends)) -static int WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode, - LOCK *lock, PROCLOCK *holder); -static void LockCountMyLocks(SHMEM_OFFSET lockOffset, PROC *proc, - int *myHolding); +/* + * map from lock method id to the lock table data structures + */ +static LockMethod LockMethods[MAX_LOCK_METHODS]; +static HTAB *LockMethodLockHash[MAX_LOCK_METHODS]; +static HTAB *LockMethodProcLockHash[MAX_LOCK_METHODS]; +static HTAB *LockMethodLocalHash[MAX_LOCK_METHODS]; + +/* exported so lmgr.c can initialize it */ +int NumLockMethods; + -static char *lock_mode_names[] = +/* private state for GrantAwaitedLock */ +static LOCALLOCK *awaitedLock; +static ResourceOwner awaitedOwner; + + +static const char *const lock_mode_names[] = { "INVALID", "AccessShareLock", @@ -85,7 +97,7 @@ static char *lock_mode_names[] = * -------- */ -int Trace_lock_oidmin = BootstrapObjectIdData; +int Trace_lock_oidmin = FirstNormalObjectId; bool Trace_locks = false; bool Trace_userlocks = false; int Trace_lock_table = 0; @@ -96,10 +108,11 @@ inline static bool LOCK_DEBUG_ENABLED(const LOCK *lock) { return - (((LOCK_LOCKMETHOD(*lock) == DEFAULT_LOCKMETHOD && Trace_locks) - || (LOCK_LOCKMETHOD(*lock) == USER_LOCKMETHOD && Trace_userlocks)) - && (lock->tag.relId >= (Oid) Trace_lock_oidmin)) - || (Trace_lock_table && (lock->tag.relId == Trace_lock_table)); + (((Trace_locks && LOCK_LOCKMETHOD(*lock) == DEFAULT_LOCKMETHOD) + || (Trace_userlocks && LOCK_LOCKMETHOD(*lock) == USER_LOCKMETHOD)) + && ((Oid) lock->tag.locktag_field2 >= (Oid) Trace_lock_oidmin)) + || (Trace_lock_table + && (lock->tag.locktag_field2 == Trace_lock_table)); } @@ -107,13 +120,15 @@ inline static void LOCK_PRINT(const char *where, const LOCK *lock, LOCKMODE type) { if (LOCK_DEBUG_ENABLED(lock)) - elog(DEBUG, - "%s: lock(%lx) tbl(%d) rel(%u) db(%u) obj(%u) grantMask(%x) " + elog(LOG, + "%s: lock(%lx) id(%u,%u,%u,%u,%u,%u) grantMask(%x) " "req(%d,%d,%d,%d,%d,%d,%d)=%d " "grant(%d,%d,%d,%d,%d,%d,%d)=%d wait(%d) type(%s)", where, MAKE_OFFSET(lock), - lock->tag.lockmethod, lock->tag.relId, lock->tag.dbId, - lock->tag.objId.blkno, lock->grantMask, + lock->tag.locktag_field1, lock->tag.locktag_field2, + lock->tag.locktag_field3, lock->tag.locktag_field4, + lock->tag.locktag_type, lock->tag.locktag_lockmethodid, + lock->grantMask, lock->requested[1], lock->requested[2], lock->requested[3], lock->requested[4], lock->requested[5], lock->requested[6], lock->requested[7], lock->nRequested, @@ -125,48 +140,33 @@ LOCK_PRINT(const char *where, const LOCK *lock, LOCKMODE type) inline static void -PROCLOCK_PRINT(const char *where, const PROCLOCK *holderP) +PROCLOCK_PRINT(const char *where, const PROCLOCK *proclockP) { - if ( - (((PROCLOCK_LOCKMETHOD(*holderP) == DEFAULT_LOCKMETHOD && Trace_locks) - || (PROCLOCK_LOCKMETHOD(*holderP) == USER_LOCKMETHOD && Trace_userlocks)) - && (((LOCK *) MAKE_PTR(holderP->tag.lock))->tag.relId >= (Oid) Trace_lock_oidmin)) - || (Trace_lock_table && (((LOCK *) MAKE_PTR(holderP->tag.lock))->tag.relId == Trace_lock_table)) - ) - elog(DEBUG, - "%s: holder(%lx) lock(%lx) tbl(%d) proc(%lx) xid(%u) hold(%d,%d,%d,%d,%d,%d,%d)=%d", - where, MAKE_OFFSET(holderP), holderP->tag.lock, - PROCLOCK_LOCKMETHOD(*(holderP)), - holderP->tag.proc, holderP->tag.xid, - holderP->holding[1], holderP->holding[2], holderP->holding[3], - holderP->holding[4], holderP->holding[5], holderP->holding[6], - holderP->holding[7], holderP->nHolding); + if (LOCK_DEBUG_ENABLED((LOCK *) MAKE_PTR(proclockP->tag.lock))) + elog(LOG, + "%s: proclock(%lx) lock(%lx) method(%u) proc(%lx) xid(%u) hold(%x)", + where, MAKE_OFFSET(proclockP), proclockP->tag.lock, + PROCLOCK_LOCKMETHOD(*(proclockP)), + proclockP->tag.proc, proclockP->tag.xid, + (int) proclockP->holdMask); } #else /* not LOCK_DEBUG */ #define LOCK_PRINT(where, lock, type) -#define PROCLOCK_PRINT(where, holderP) - -#endif /* not LOCK_DEBUG */ +#define PROCLOCK_PRINT(where, proclockP) +#endif /* not LOCK_DEBUG */ -/* - * These are to simplify/speed up some bit arithmetic. - * - * XXX is a fetch from a static array really faster than a shift? - * Wouldn't bet on it... - */ - -static LOCKMASK BITS_OFF[MAX_LOCKMODES]; -static LOCKMASK BITS_ON[MAX_LOCKMODES]; - -/* - * map from lockmethod to the lock table structure - */ -static LOCKMETHODTABLE *LockMethodTable[MAX_LOCK_METHODS]; +static void RemoveLocalLock(LOCALLOCK *locallock); +static void GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner); +static int WaitOnLock(LOCKMETHODID lockmethodid, LOCALLOCK *locallock, + ResourceOwner owner); +static void LockCountMyLocks(SHMEM_OFFSET lockOffset, PGPROC *proc, + int *myHolding); +static bool UnGrantLock(LOCK *lock, LOCKMODE lockmode, + PROCLOCK *proclock, LockMethod lockMethodTable); -static int NumLockMethods; /* * InitLocks -- Init the lock module. Create a private data @@ -175,28 +175,20 @@ static int NumLockMethods; void InitLocks(void) { - int i; - int bit; - - bit = 1; - for (i = 0; i < MAX_LOCKMODES; i++, bit <<= 1) - { - BITS_ON[i] = bit; - BITS_OFF[i] = ~bit; - } + /* NOP */ } /* * Fetch the lock method table associated with a given lock */ -LOCKMETHODTABLE * +LockMethod GetLocksMethodTable(LOCK *lock) { - LOCKMETHOD lockmethod = LOCK_LOCKMETHOD(*lock); + LOCKMETHODID lockmethodid = LOCK_LOCKMETHOD(*lock); - Assert(lockmethod > 0 && lockmethod < NumLockMethods); - return LockMethodTable[lockmethod]; + Assert(0 < lockmethodid && lockmethodid < NumLockMethods); + return LockMethods[lockmethodid]; } @@ -207,20 +199,16 @@ GetLocksMethodTable(LOCK *lock) * Notes: just copying. Should only be called once. */ static void -LockMethodInit(LOCKMETHODTABLE *lockMethodTable, - LOCKMASK *conflictsP, - int *prioP, +LockMethodInit(LockMethod lockMethodTable, + const LOCKMASK *conflictsP, int numModes) { int i; - lockMethodTable->ctl->numLockModes = numModes; - numModes++; - for (i = 0; i < numModes; i++, prioP++, conflictsP++) - { - lockMethodTable->ctl->conflictTab[i] = *conflictsP; - lockMethodTable->ctl->prio[i] = *prioP; - } + lockMethodTable->numLockModes = numModes; + /* copies useless zero element as well as the N lockmodes */ + for (i = 0; i <= numModes; i++) + lockMethodTable->conflictTab[i] = conflictsP[i]; } /* @@ -232,14 +220,14 @@ LockMethodInit(LOCKMETHODTABLE *lockMethodTable, * by the postmaster are inherited by each backend, so they must be in * TopMemoryContext. */ -LOCKMETHOD -LockMethodTableInit(char *tabName, - LOCKMASK *conflictsP, - int *prioP, +LOCKMETHODID +LockMethodTableInit(const char *tabName, + const LOCKMASK *conflictsP, int numModes, int maxBackends) { - LOCKMETHODTABLE *lockMethodTable; + LockMethod newLockMethod; + LOCKMETHODID lockmethodid; char *shmemName; HASHCTL info; int hash_flags; @@ -248,140 +236,139 @@ LockMethodTableInit(char *tabName, max_table_size; if (numModes >= MAX_LOCKMODES) - { - elog(NOTICE, "LockMethodTableInit: too many lock types %d greater than %d", - numModes, MAX_LOCKMODES); - return INVALID_LOCKMETHOD; - } + elog(ERROR, "too many lock types %d (limit is %d)", + numModes, MAX_LOCKMODES - 1); /* Compute init/max size to request for lock hashtables */ max_table_size = NLOCKENTS(maxBackends); - init_table_size = max_table_size / 10; + init_table_size = max_table_size / 2; /* Allocate a string for the shmem index table lookups. */ /* This is just temp space in this routine, so palloc is OK. */ shmemName = (char *) palloc(strlen(tabName) + 32); - /* each lock table has a non-shared, permanent header */ - lockMethodTable = (LOCKMETHODTABLE *) - MemoryContextAlloc(TopMemoryContext, sizeof(LOCKMETHODTABLE)); + /* each lock table has a header in shared memory */ + sprintf(shmemName, "%s (lock method table)", tabName); + newLockMethod = (LockMethod) + ShmemInitStruct(shmemName, sizeof(LockMethodData), &found); - /* - * Lock the LWLock for the table (probably not necessary here) - */ - LWLockAcquire(LockMgrLock, LW_EXCLUSIVE); - - /* - * allocate a control structure from shared memory or attach to it if - * it already exists. - */ - sprintf(shmemName, "%s (ctl)", tabName); - lockMethodTable->ctl = (LOCKMETHODCTL *) - ShmemInitStruct(shmemName, sizeof(LOCKMETHODCTL), &found); - - if (!lockMethodTable->ctl) - elog(FATAL, "LockMethodTableInit: couldn't initialize %s", tabName); - - /* - * no zero-th table - */ - NumLockMethods = 1; + if (!newLockMethod) + elog(FATAL, "could not initialize lock table \"%s\"", tabName); /* * we're first - initialize */ if (!found) { - MemSet(lockMethodTable->ctl, 0, sizeof(LOCKMETHODCTL)); - lockMethodTable->ctl->masterLock = LockMgrLock; - lockMethodTable->ctl->lockmethod = NumLockMethods; + MemSet(newLockMethod, 0, sizeof(LockMethodData)); + newLockMethod->masterLock = LockMgrLock; + LockMethodInit(newLockMethod, conflictsP, numModes); } /* * other modules refer to the lock table by a lockmethod ID */ - LockMethodTable[NumLockMethods] = lockMethodTable; - NumLockMethods++; - Assert(NumLockMethods <= MAX_LOCK_METHODS); + Assert(NumLockMethods < MAX_LOCK_METHODS); + lockmethodid = NumLockMethods++; + LockMethods[lockmethodid] = newLockMethod; /* * allocate a hash table for LOCK structs. This is used to store * per-locked-object information. */ - info.keysize = SHMEM_LOCKTAB_KEYSIZE; - info.datasize = SHMEM_LOCKTAB_DATASIZE; + MemSet(&info, 0, sizeof(info)); + info.keysize = sizeof(LOCKTAG); + info.entrysize = sizeof(LOCK); info.hash = tag_hash; hash_flags = (HASH_ELEM | HASH_FUNCTION); sprintf(shmemName, "%s (lock hash)", tabName); - lockMethodTable->lockHash = ShmemInitHash(shmemName, - init_table_size, - max_table_size, - &info, - hash_flags); + LockMethodLockHash[lockmethodid] = ShmemInitHash(shmemName, + init_table_size, + max_table_size, + &info, + hash_flags); - if (!lockMethodTable->lockHash) - elog(FATAL, "LockMethodTableInit: couldn't initialize %s", tabName); - Assert(lockMethodTable->lockHash->hash == tag_hash); + if (!LockMethodLockHash[lockmethodid]) + elog(FATAL, "could not initialize lock table \"%s\"", tabName); /* - * allocate a hash table for PROCLOCK structs. This is used to store + * allocate a hash table for PROCLOCK structs. This is used to store * per-lock-holder information. */ - info.keysize = SHMEM_PROCLOCKTAB_KEYSIZE; - info.datasize = SHMEM_PROCLOCKTAB_DATASIZE; + info.keysize = sizeof(PROCLOCKTAG); + info.entrysize = sizeof(PROCLOCK); info.hash = tag_hash; hash_flags = (HASH_ELEM | HASH_FUNCTION); - sprintf(shmemName, "%s (holder hash)", tabName); - lockMethodTable->holderHash = ShmemInitHash(shmemName, - init_table_size, - max_table_size, - &info, - hash_flags); + sprintf(shmemName, "%s (proclock hash)", tabName); + LockMethodProcLockHash[lockmethodid] = ShmemInitHash(shmemName, + init_table_size, + max_table_size, + &info, + hash_flags); - if (!lockMethodTable->holderHash) - elog(FATAL, "LockMethodTableInit: couldn't initialize %s", tabName); + if (!LockMethodProcLockHash[lockmethodid]) + elog(FATAL, "could not initialize lock table \"%s\"", tabName); - /* init ctl data structures */ - LockMethodInit(lockMethodTable, conflictsP, prioP, numModes); + /* + * allocate a non-shared hash table for LOCALLOCK structs. This is + * used to store lock counts and resource owner information. + * + * The non-shared table could already exist in this process (this occurs + * when the postmaster is recreating shared memory after a backend + * crash). If so, delete and recreate it. (We could simply leave it, + * since it ought to be empty in the postmaster, but for safety let's + * zap it.) + */ + if (LockMethodLocalHash[lockmethodid]) + hash_destroy(LockMethodLocalHash[lockmethodid]); - LWLockRelease(LockMgrLock); + info.keysize = sizeof(LOCALLOCKTAG); + info.entrysize = sizeof(LOCALLOCK); + info.hash = tag_hash; + hash_flags = (HASH_ELEM | HASH_FUNCTION); + + sprintf(shmemName, "%s (locallock hash)", tabName); + LockMethodLocalHash[lockmethodid] = hash_create(shmemName, + 128, + &info, + hash_flags); pfree(shmemName); - return lockMethodTable->ctl->lockmethod; + return lockmethodid; } /* * LockMethodTableRename -- allocate another lockmethod ID to the same * lock table. * - * NOTES: Both the lock module and the lock chain (lchain.c) - * module use table id's to distinguish between different - * kinds of locks. Short term and long term locks look - * the same to the lock table, but are handled differently - * by the lock chain manager. This function allows the - * client to use different lockmethods when acquiring/releasing - * short term and long term locks, yet store them all in one hashtable. + * NOTES: This function makes it possible to have different lockmethodids, + * and hence different locking semantics, while still storing all + * the data in one shared-memory hashtable. */ -LOCKMETHOD -LockMethodTableRename(LOCKMETHOD lockmethod) +LOCKMETHODID +LockMethodTableRename(LOCKMETHODID lockmethodid) { - LOCKMETHOD newLockMethod; + LOCKMETHODID newLockMethodId; if (NumLockMethods >= MAX_LOCK_METHODS) return INVALID_LOCKMETHOD; - if (LockMethodTable[lockmethod] == INVALID_LOCKMETHOD) + if (LockMethods[lockmethodid] == INVALID_LOCKMETHOD) return INVALID_LOCKMETHOD; /* other modules refer to the lock table by a lockmethod ID */ - newLockMethod = NumLockMethods; + newLockMethodId = NumLockMethods; NumLockMethods++; - LockMethodTable[newLockMethod] = LockMethodTable[lockmethod]; - return newLockMethod; + LockMethods[newLockMethodId] = LockMethods[lockmethodid]; + LockMethodLockHash[newLockMethodId] = LockMethodLockHash[lockmethodid]; + LockMethodProcLockHash[newLockMethodId] = LockMethodProcLockHash[lockmethodid]; + LockMethodLocalHash[newLockMethodId] = LockMethodLocalHash[lockmethodid]; + + return newLockMethodId; } /* @@ -391,7 +378,7 @@ LockMethodTableRename(LOCKMETHOD lockmethod) * Returns: TRUE if lock was acquired, FALSE otherwise. Note that * a FALSE return is to be expected if dontWait is TRUE; * but if dontWait is FALSE, only a parameter error can cause - * a FALSE return. (XXX probably we should just elog on parameter + * a FALSE return. (XXX probably we should just ereport on parameter * errors, instead of conflating this with failure to acquire lock?) * * Side Effects: The lock is acquired and recorded in lock tables. @@ -411,33 +398,16 @@ LockMethodTableRename(LOCKMETHOD lockmethod) * the lock. While the lock is active other clients can still * read and write the tuple but they can be aware that it has * been locked at the application level by someone. - * User locks use lock tags made of an uint16 and an uint32, for - * example 0 and a tuple oid, or any other arbitrary pair of - * numbers following a convention established by the application. - * In this sense tags don't refer to tuples or database entities. + * * User locks and normal locks are completely orthogonal and - * they don't interfere with each other, so it is possible - * to acquire a normal lock on an user-locked tuple or user-lock - * a tuple for which a normal write lock already exists. + * they don't interfere with each other. + * * User locks are always non blocking, therefore they are never * acquired if already held by another process. They must be * released explicitly by the application but they are released * automatically when a backend terminates. * They are indicated by a lockmethod 2 which is an alias for the - * normal lock table, and are distinguished from normal locks - * by the following differences: - * - * normal lock user lock - * - * lockmethod 1 2 - * tag.dbId database oid database oid - * tag.relId rel oid or 0 0 - * tag.objId block id lock id2 - * or xact id - * tag.offnum 0 lock id1 - * holder.xid xid or 0 0 - * persistence transaction user or backend - * or backend + * normal lock table. * * The lockmode parameter can have the same values for normal locks * although probably only WRITE_LOCK can have some practical use. @@ -446,53 +416,129 @@ LockMethodTableRename(LOCKMETHOD lockmethod) */ bool -LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, +LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag, TransactionId xid, LOCKMODE lockmode, bool dontWait) { - PROCLOCK *holder; - PROCLOCKTAG holdertag; - HTAB *holderTable; - bool found; + LOCALLOCKTAG localtag; + LOCALLOCK *locallock; LOCK *lock; + PROCLOCK *proclock; + PROCLOCKTAG proclocktag; + bool found; + ResourceOwner owner; LWLockId masterLock; - LOCKMETHODTABLE *lockMethodTable; + LockMethod lockMethodTable; int status; int myHolding[MAX_LOCKMODES]; int i; #ifdef LOCK_DEBUG - if (lockmethod == USER_LOCKMETHOD && Trace_userlocks) - elog(DEBUG, "LockAcquire: user lock [%u] %s", - locktag->objId.blkno, lock_mode_names[lockmode]); + if (Trace_userlocks && lockmethodid == USER_LOCKMETHOD) + elog(LOG, "LockAcquire: user lock [%u,%u] %s", + locktag->locktag_field1, locktag->locktag_field2, + lock_mode_names[lockmode]); #endif - /* ???????? This must be changed when short term locks will be used */ - locktag->lockmethod = lockmethod; + /* ugly */ + locktag->locktag_lockmethodid = lockmethodid; - Assert(lockmethod < NumLockMethods); - lockMethodTable = LockMethodTable[lockmethod]; + Assert(lockmethodid < NumLockMethods); + lockMethodTable = LockMethods[lockmethodid]; if (!lockMethodTable) { - elog(NOTICE, "LockAcquire: bad lock table %d", lockmethod); + elog(WARNING, "bad lock table id: %d", lockmethodid); return FALSE; } - masterLock = lockMethodTable->ctl->masterLock; + /* Session locks and user locks are not transactional */ + if (xid != InvalidTransactionId && + lockmethodid == DEFAULT_LOCKMETHOD) + owner = CurrentResourceOwner; + else + owner = NULL; + + /* + * Find or create a LOCALLOCK entry for this lock and lockmode + */ + MemSet(&localtag, 0, sizeof(localtag)); /* must clear padding */ + localtag.lock = *locktag; + localtag.xid = xid; + localtag.mode = lockmode; + + locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash[lockmethodid], + (void *) &localtag, + HASH_ENTER, &found); + if (!locallock) + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of memory"))); + + /* + * if it's a new locallock object, initialize it + */ + if (!found) + { + locallock->lock = NULL; + locallock->proclock = NULL; + locallock->nLocks = 0; + locallock->numLockOwners = 0; + locallock->maxLockOwners = 8; + locallock->lockOwners = NULL; + locallock->lockOwners = (LOCALLOCKOWNER *) + MemoryContextAlloc(TopMemoryContext, + locallock->maxLockOwners * sizeof(LOCALLOCKOWNER)); + } + else + { + /* Make sure there will be room to remember the lock */ + if (locallock->numLockOwners >= locallock->maxLockOwners) + { + int newsize = locallock->maxLockOwners * 2; + + locallock->lockOwners = (LOCALLOCKOWNER *) + repalloc(locallock->lockOwners, + newsize * sizeof(LOCALLOCKOWNER)); + locallock->maxLockOwners = newsize; + } + } + + /* + * If we already hold the lock, we can just increase the count + * locally. + */ + if (locallock->nLocks > 0) + { + GrantLockLocal(locallock, owner); + return TRUE; + } + + /* + * Otherwise we've got to mess with the shared lock table. + */ + masterLock = lockMethodTable->masterLock; LWLockAcquire(masterLock, LW_EXCLUSIVE); /* - * Find or create a lock with this tag + * Find or create a lock with this tag. + * + * Note: if the locallock object already existed, it might have a pointer + * to the lock already ... but we probably should not assume that that + * pointer is valid, since a lock object with no locks can go away + * anytime. */ - Assert(lockMethodTable->lockHash->hash == tag_hash); - lock = (LOCK *) hash_search(lockMethodTable->lockHash, (Pointer) locktag, + lock = (LOCK *) hash_search(LockMethodLockHash[lockmethodid], + (void *) locktag, HASH_ENTER, &found); if (!lock) { LWLockRelease(masterLock); - elog(FATAL, "LockAcquire: lock table %d is corrupted", lockmethod); - return FALSE; + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of shared memory"), + errhint("You may need to increase max_locks_per_transaction."))); } + locallock->lock = lock; /* * if it's a new lock object, initialize it @@ -501,7 +547,7 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, { lock->grantMask = 0; lock->waitMask = 0; - SHMQueueInit(&(lock->lockHolders)); + SHMQueueInit(&(lock->procLocks)); ProcQueueInit(&(lock->waitProcs)); lock->nRequested = 0; lock->nGranted = 0; @@ -518,44 +564,60 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, } /* - * Create the hash key for the holder table. + * Create the hash key for the proclock table. */ - MemSet(&holdertag, 0, sizeof(PROCLOCKTAG)); /* must clear padding, - * needed */ - holdertag.lock = MAKE_OFFSET(lock); - holdertag.proc = MAKE_OFFSET(MyProc); - TransactionIdStore(xid, &holdertag.xid); + MemSet(&proclocktag, 0, sizeof(PROCLOCKTAG)); /* must clear padding */ + proclocktag.lock = MAKE_OFFSET(lock); + proclocktag.proc = MAKE_OFFSET(MyProc); + TransactionIdStore(xid, &proclocktag.xid); /* - * Find or create a holder entry with this tag + * Find or create a proclock entry with this tag */ - holderTable = lockMethodTable->holderHash; - holder = (PROCLOCK *) hash_search(holderTable, (Pointer) &holdertag, - HASH_ENTER, &found); - if (!holder) + proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid], + (void *) &proclocktag, + HASH_ENTER, &found); + if (!proclock) { + /* Ooops, not enough shmem for the proclock */ + if (lock->nRequested == 0) + { + /* + * There are no other requestors of this lock, so garbage-collect + * the lock object. We *must* do this to avoid a permanent leak + * of shared memory, because there won't be anything to cause + * anyone to release the lock object later. + */ + Assert(SHMQueueEmpty(&(lock->procLocks))); + lock = (LOCK *) hash_search(LockMethodLockHash[lockmethodid], + (void *) &(lock->tag), + HASH_REMOVE, NULL); + } LWLockRelease(masterLock); - elog(FATAL, "LockAcquire: holder table corrupted"); - return FALSE; + if (!lock) /* hash remove failed? */ + elog(WARNING, "lock table corrupted"); + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of shared memory"), + errhint("You may need to increase max_locks_per_transaction."))); } + locallock->proclock = proclock; /* * If new, initialize the new entry */ if (!found) { - holder->nHolding = 0; - MemSet((char *) holder->holding, 0, sizeof(int) * MAX_LOCKMODES); - /* Add holder to appropriate lists */ - SHMQueueInsertBefore(&lock->lockHolders, &holder->lockLink); - SHMQueueInsertBefore(&MyProc->procHolders, &holder->procLink); - PROCLOCK_PRINT("LockAcquire: new", holder); + proclock->holdMask = 0; + /* Add proclock to appropriate lists */ + SHMQueueInsertBefore(&lock->procLocks, &proclock->lockLink); + SHMQueueInsertBefore(&MyProc->procLocks, &proclock->procLink); + PROCLOCK_PRINT("LockAcquire: new", proclock); } else { - PROCLOCK_PRINT("LockAcquire: found", holder); - Assert((holder->nHolding >= 0) && (holder->holding[lockmode] >= 0)); - Assert(holder->nHolding <= lock->nGranted); + PROCLOCK_PRINT("LockAcquire: found", proclock); + Assert((proclock->holdMask & ~lock->grantMask) == 0); #ifdef CHECK_DEADLOCK_RISK @@ -574,20 +636,20 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, * XXX Doing numeric comparison on the lockmodes is a hack; it'd be * better to use a table. For now, though, this works. */ - for (i = lockMethodTable->ctl->numLockModes; i > 0; i--) + for (i = lockMethodTable->numLockModes; i > 0; i--) { - if (holder->holding[i] > 0) + if (proclock->holdMask & LOCKBIT_ON(i)) { if (i >= (int) lockmode) break; /* safe: we have a lock >= req level */ - elog(DEBUG, "Deadlock risk: raising lock level" + elog(LOG, "deadlock risk: raising lock level" " from %s to %s on object %u/%u/%u", lock_mode_names[i], lock_mode_names[lockmode], lock->tag.relId, lock->tag.dbId, lock->tag.objId.blkno); break; } } -#endif /* CHECK_DEADLOCK_RISK */ +#endif /* CHECK_DEADLOCK_RISK */ } /* @@ -600,26 +662,15 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0)); /* - * If I already hold one or more locks of the requested type, just - * grant myself another one without blocking. - */ - if (holder->holding[lockmode] > 0) - { - GrantLock(lock, holder, lockmode); - PROCLOCK_PRINT("LockAcquire: owning", holder); - LWLockRelease(masterLock); - return TRUE; - } - - /* - * If this process (under any XID) is a holder of the lock, also grant + * If this process (under any XID) is a holder of the lock, just grant * myself another one without blocking. */ - LockCountMyLocks(holder->tag.lock, MyProc, myHolding); + LockCountMyLocks(proclock->tag.lock, MyProc, myHolding); if (myHolding[lockmode] > 0) { - GrantLock(lock, holder, lockmode); - PROCLOCK_PRINT("LockAcquire: my other XID owning", holder); + GrantLock(lock, proclock, lockmode); + GrantLockLocal(locallock, owner); + PROCLOCK_PRINT("LockAcquire: my other XID owning", proclock); LWLockRelease(masterLock); return TRUE; } @@ -629,45 +680,50 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, * join wait queue. Otherwise, check for conflict with already-held * locks. (That's last because most complex check.) */ - if (lockMethodTable->ctl->conflictTab[lockmode] & lock->waitMask) + if (lockMethodTable->conflictTab[lockmode] & lock->waitMask) status = STATUS_FOUND; else status = LockCheckConflicts(lockMethodTable, lockmode, - lock, holder, + lock, proclock, MyProc, myHolding); if (status == STATUS_OK) { /* No conflict with held or previously requested locks */ - GrantLock(lock, holder, lockmode); + GrantLock(lock, proclock, lockmode); + GrantLockLocal(locallock, owner); } else { Assert(status == STATUS_FOUND); + /* * We can't acquire the lock immediately. If caller specified no - * blocking, remove the holder entry and return FALSE without waiting. + * blocking, remove useless table entries and return FALSE without + * waiting. */ if (dontWait) { - if (holder->nHolding == 0) + if (proclock->holdMask == 0) { - SHMQueueDelete(&holder->lockLink); - SHMQueueDelete(&holder->procLink); - holder = (PROCLOCK *) hash_search(holderTable, - (Pointer) holder, - HASH_REMOVE, &found); - if (!holder || !found) - elog(NOTICE, "LockAcquire: remove holder, table corrupted"); + SHMQueueDelete(&proclock->lockLink); + SHMQueueDelete(&proclock->procLink); + proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid], + (void *) &(proclock->tag), + HASH_REMOVE, NULL); + if (!proclock) + elog(WARNING, "proclock table corrupted"); } else - PROCLOCK_PRINT("LockAcquire: NHOLDING", holder); + PROCLOCK_PRINT("LockAcquire: NOWAIT", proclock); lock->nRequested--; lock->requested[lockmode]--; LOCK_PRINT("LockAcquire: conditional lock failed", lock, lockmode); Assert((lock->nRequested > 0) && (lock->requested[lockmode] >= 0)); Assert(lock->nGranted <= lock->nRequested); LWLockRelease(masterLock); + if (locallock->nLocks == 0) + RemoveLocalLock(locallock); return FALSE; } @@ -675,15 +731,12 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, * Construct bitmask of locks this process holds on this object. */ { - int heldLocks = 0; - int tmpMask; + LOCKMASK heldLocks = 0; - for (i = 1, tmpMask = 2; - i <= lockMethodTable->ctl->numLockModes; - i++, tmpMask <<= 1) + for (i = 1; i <= lockMethodTable->numLockModes; i++) { if (myHolding[i] > 0) - heldLocks |= tmpMask; + heldLocks |= LOCKBIT_ON(i); } MyProc->heldLocks = heldLocks; } @@ -691,7 +744,7 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, /* * Sleep till someone wakes me up. */ - status = WaitOnLock(lockmethod, lockmode, lock, holder); + status = WaitOnLock(lockmethodid, locallock, owner); /* * NOTE: do not do any material change of state between here and @@ -701,18 +754,18 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, */ /* - * Check the holder entry status, in case something in the ipc + * Check the proclock entry status, in case something in the ipc * communication doesn't work correctly. */ - if (!((holder->nHolding > 0) && (holder->holding[lockmode] > 0))) + if (!(proclock->holdMask & LOCKBIT_ON(lockmode))) { - PROCLOCK_PRINT("LockAcquire: INCONSISTENT", holder); + PROCLOCK_PRINT("LockAcquire: INCONSISTENT", proclock); LOCK_PRINT("LockAcquire: INCONSISTENT", lock, lockmode); /* Should we retry ? */ LWLockRelease(masterLock); return FALSE; } - PROCLOCK_PRINT("LockAcquire: granted", holder); + PROCLOCK_PRINT("LockAcquire: granted", proclock); LOCK_PRINT("LockAcquire: granted", lock, lockmode); } @@ -721,6 +774,23 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, return status == STATUS_OK; } +/* + * Subroutine to free a locallock entry + */ +static void +RemoveLocalLock(LOCALLOCK *locallock) +{ + LOCKMETHODID lockmethodid = LOCALLOCK_LOCKMETHOD(*locallock); + + pfree(locallock->lockOwners); + locallock->lockOwners = NULL; + locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash[lockmethodid], + (void *) &(locallock->tag), + HASH_REMOVE, NULL); + if (!locallock) + elog(WARNING, "locallock table corrupted"); +} + /* * LockCheckConflicts -- test whether requested lock conflicts * with those already granted @@ -738,18 +808,16 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, * known. If NULL is passed then these values will be computed internally. */ int -LockCheckConflicts(LOCKMETHODTABLE *lockMethodTable, +LockCheckConflicts(LockMethod lockMethodTable, LOCKMODE lockmode, LOCK *lock, - PROCLOCK *holder, - PROC *proc, + PROCLOCK *proclock, + PGPROC *proc, int *myHolding) /* myHolding[] array or NULL */ { - LOCKMETHODCTL *lockctl = lockMethodTable->ctl; - int numLockModes = lockctl->numLockModes; - int bitmask; - int i, - tmpMask; + int numLockModes = lockMethodTable->numLockModes; + LOCKMASK bitmask; + int i; int localHolding[MAX_LOCKMODES]; /* @@ -761,9 +829,9 @@ LockCheckConflicts(LOCKMETHODTABLE *lockMethodTable, * each type of lock that conflicts with request. Bitwise compare * tells if there is a conflict. */ - if (!(lockctl->conflictTab[lockmode] & lock->grantMask)) + if (!(lockMethodTable->conflictTab[lockmode] & lock->grantMask)) { - PROCLOCK_PRINT("LockCheckConflicts: no conflict", holder); + PROCLOCK_PRINT("LockCheckConflicts: no conflict", proclock); return STATUS_OK; } @@ -776,17 +844,16 @@ LockCheckConflicts(LOCKMETHODTABLE *lockMethodTable, if (myHolding == NULL) { /* Caller didn't do calculation of total holding for me */ - LockCountMyLocks(holder->tag.lock, proc, localHolding); + LockCountMyLocks(proclock->tag.lock, proc, localHolding); myHolding = localHolding; } /* Compute mask of lock types held by other processes */ bitmask = 0; - tmpMask = 2; - for (i = 1; i <= numLockModes; i++, tmpMask <<= 1) + for (i = 1; i <= numLockModes; i++) { if (lock->granted[i] != myHolding[i]) - bitmask |= tmpMask; + bitmask |= LOCKBIT_ON(i); } /* @@ -794,14 +861,14 @@ LockCheckConflicts(LOCKMETHODTABLE *lockMethodTable, * locks held by other processes. If one of these conflicts with the * kind of lock that I want, there is a conflict and I have to sleep. */ - if (!(lockctl->conflictTab[lockmode] & bitmask)) + if (!(lockMethodTable->conflictTab[lockmode] & bitmask)) { /* no conflict. OK to get the lock */ - PROCLOCK_PRINT("LockCheckConflicts: resolved", holder); + PROCLOCK_PRINT("LockCheckConflicts: resolved", proclock); return STATUS_OK; } - PROCLOCK_PRINT("LockCheckConflicts: conflicting", holder); + PROCLOCK_PRINT("LockCheckConflicts: conflicting", proclock); return STATUS_FOUND; } @@ -816,51 +883,160 @@ LockCheckConflicts(LOCKMETHODTABLE *lockMethodTable, * be a net slowdown. */ static void -LockCountMyLocks(SHMEM_OFFSET lockOffset, PROC *proc, int *myHolding) +LockCountMyLocks(SHMEM_OFFSET lockOffset, PGPROC *proc, int *myHolding) { - SHM_QUEUE *procHolders = &(proc->procHolders); - PROCLOCK *holder; - int i; + SHM_QUEUE *procLocks = &(proc->procLocks); + PROCLOCK *proclock; MemSet(myHolding, 0, MAX_LOCKMODES * sizeof(int)); - holder = (PROCLOCK *) SHMQueueNext(procHolders, procHolders, - offsetof(PROCLOCK, procLink)); + proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks, + offsetof(PROCLOCK, procLink)); - while (holder) + while (proclock) { - if (lockOffset == holder->tag.lock) + if (lockOffset == proclock->tag.lock) { + LOCKMASK holdMask = proclock->holdMask; + int i; + for (i = 1; i < MAX_LOCKMODES; i++) - myHolding[i] += holder->holding[i]; + { + if (holdMask & LOCKBIT_ON(i)) + myHolding[i]++; + } } - holder = (PROCLOCK *) SHMQueueNext(procHolders, &holder->procLink, - offsetof(PROCLOCK, procLink)); + proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->procLink, + offsetof(PROCLOCK, procLink)); } } /* - * GrantLock -- update the lock and holder data structures to show + * GrantLock -- update the lock and proclock data structures to show * the lock request has been granted. * * NOTE: if proc was blocked, it also needs to be removed from the wait list - * and have its waitLock/waitHolder fields cleared. That's not done here. + * and have its waitLock/waitProcLock fields cleared. That's not done here. + * + * NOTE: the lock grant also has to be recorded in the associated LOCALLOCK + * table entry; but since we may be awaking some other process, we can't do + * that here; it's done by GrantLockLocal, instead. */ void -GrantLock(LOCK *lock, PROCLOCK *holder, LOCKMODE lockmode) +GrantLock(LOCK *lock, PROCLOCK *proclock, LOCKMODE lockmode) { lock->nGranted++; lock->granted[lockmode]++; - lock->grantMask |= BITS_ON[lockmode]; + lock->grantMask |= LOCKBIT_ON(lockmode); if (lock->granted[lockmode] == lock->requested[lockmode]) - lock->waitMask &= BITS_OFF[lockmode]; + lock->waitMask &= LOCKBIT_OFF(lockmode); + proclock->holdMask |= LOCKBIT_ON(lockmode); LOCK_PRINT("GrantLock", lock, lockmode); Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0)); Assert(lock->nGranted <= lock->nRequested); - holder->holding[lockmode]++; - holder->nHolding++; - Assert((holder->nHolding > 0) && (holder->holding[lockmode] > 0)); +} + +/* + * UnGrantLock -- opposite of GrantLock. + * + * Updates the lock and proclock data structures to show that the lock + * is no longer held nor requested by the current holder. + * + * Returns true if there were any waiters waiting on the lock that + * should now be woken up with ProcLockWakeup. + */ +static bool +UnGrantLock(LOCK *lock, LOCKMODE lockmode, + PROCLOCK *proclock, LockMethod lockMethodTable) +{ + bool wakeupNeeded = false; + + Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0)); + Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0)); + Assert(lock->nGranted <= lock->nRequested); + + /* + * fix the general lock stats + */ + lock->nRequested--; + lock->requested[lockmode]--; + lock->nGranted--; + lock->granted[lockmode]--; + + if (lock->granted[lockmode] == 0) + { + /* change the conflict mask. No more of this lock type. */ + lock->grantMask &= LOCKBIT_OFF(lockmode); + } + + LOCK_PRINT("UnGrantLock: updated", lock, lockmode); + + /* + * We need only run ProcLockWakeup if the released lock conflicts with + * at least one of the lock types requested by waiter(s). Otherwise + * whatever conflict made them wait must still exist. NOTE: before + * MVCC, we could skip wakeup if lock->granted[lockmode] was still + * positive. But that's not true anymore, because the remaining + * granted locks might belong to some waiter, who could now be + * awakened because he doesn't conflict with his own locks. + */ + if (lockMethodTable->conflictTab[lockmode] & lock->waitMask) + wakeupNeeded = true; + + /* + * Now fix the per-proclock state. + */ + proclock->holdMask &= LOCKBIT_OFF(lockmode); + PROCLOCK_PRINT("UnGrantLock: updated", proclock); + + return wakeupNeeded; +} + +/* + * GrantLockLocal -- update the locallock data structures to show + * the lock request has been granted. + * + * We expect that LockAcquire made sure there is room to add a new + * ResourceOwner entry. + */ +static void +GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner) +{ + LOCALLOCKOWNER *lockOwners = locallock->lockOwners; + int i; + + Assert(locallock->numLockOwners < locallock->maxLockOwners); + /* Count the total */ + locallock->nLocks++; + /* Count the per-owner lock */ + for (i = 0; i < locallock->numLockOwners; i++) + { + if (lockOwners[i].owner == owner) + { + lockOwners[i].nLocks++; + return; + } + } + lockOwners[i].owner = owner; + lockOwners[i].nLocks = 1; + locallock->numLockOwners++; +} + +/* + * GrantAwaitedLock -- call GrantLockLocal for the lock we are doing + * WaitOnLock on. + * + * proc.c needs this for the case where we are booted off the lock by + * timeout, but discover that someone granted us the lock anyway. + * + * We could just export GrantLockLocal, but that would require including + * resowner.h in lock.h, which creates circularity. + */ +void +GrantAwaitedLock(void) +{ + GrantLockLocal(awaitedLock, awaitedOwner); } /* @@ -872,27 +1048,33 @@ GrantLock(LOCK *lock, PROCLOCK *holder, LOCKMODE lockmode) * The locktable's masterLock must be held at entry. */ static int -WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode, - LOCK *lock, PROCLOCK *holder) +WaitOnLock(LOCKMETHODID lockmethodid, LOCALLOCK *locallock, + ResourceOwner owner) { - LOCKMETHODTABLE *lockMethodTable = LockMethodTable[lockmethod]; + LockMethod lockMethodTable = LockMethods[lockmethodid]; char *new_status, *old_status; + size_t len; - Assert(lockmethod < NumLockMethods); + Assert(lockmethodid < NumLockMethods); - LOCK_PRINT("WaitOnLock: sleeping on lock", lock, lockmode); + LOCK_PRINT("WaitOnLock: sleeping on lock", + locallock->lock, locallock->tag.mode); old_status = pstrdup(get_ps_display()); - new_status = (char *) palloc(strlen(old_status) + 10); - strcpy(new_status, old_status); - strcat(new_status, " waiting"); + len = strlen(old_status); + new_status = (char *) palloc(len + 8 + 1); + memcpy(new_status, old_status, len); + strcpy(new_status + len, " waiting"); set_ps_display(new_status); + awaitedLock = locallock; + awaitedOwner = owner; + /* * NOTE: Think not to put any shared-state cleanup after the call to * ProcSleep, in either the normal or failure path. The lock state - * must be fully set by the lock grantor, or by HandleDeadLock if we + * must be fully set by the lock grantor, or by CheckDeadLock if we * give up waiting for the lock. This is necessary because of the * possibility that a cancel/die interrupt will interrupt ProcSleep * after someone else grants us the lock, but before we've noticed it. @@ -904,27 +1086,35 @@ WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode, */ if (ProcSleep(lockMethodTable, - lockmode, - lock, - holder) != STATUS_OK) + locallock->tag.mode, + locallock->lock, + locallock->proclock) != STATUS_OK) { + /* + * We failed as a result of a deadlock, see CheckDeadLock(). Quit + * now. + */ + awaitedLock = NULL; + LOCK_PRINT("WaitOnLock: aborting on lock", + locallock->lock, locallock->tag.mode); + LWLockRelease(lockMethodTable->masterLock); /* - * We failed as a result of a deadlock, see HandleDeadLock(). Quit - * now. Removal of the holder and lock objects, if no longer - * needed, will happen in xact cleanup (see above for motivation). + * Now that we aren't holding the LockMgrLock, we can give an + * error report including details about the detected deadlock. */ - LOCK_PRINT("WaitOnLock: aborting on lock", lock, lockmode); - LWLockRelease(lockMethodTable->ctl->masterLock); - elog(ERROR, "deadlock detected"); + DeadLockReport(); /* not reached */ } + awaitedLock = NULL; + set_ps_display(old_status); pfree(old_status); pfree(new_status); - LOCK_PRINT("WaitOnLock: wakeup on lock", lock, lockmode); + LOCK_PRINT("WaitOnLock: wakeup on lock", + locallock->lock, locallock->tag.mode); return STATUS_OK; } @@ -934,22 +1124,21 @@ WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode, * * Locktable lock must be held by caller. * - * NB: this does not remove the process' holder object, nor the lock object, - * even though their counts might now have gone to zero. That will happen - * during a subsequent LockReleaseAll call, which we expect will happen - * during transaction cleanup. (Removal of a proc from its wait queue by - * this routine can only happen if we are aborting the transaction.) + * NB: this does not clean up any locallock object that may exist for the lock. */ void -RemoveFromWaitQueue(PROC *proc) +RemoveFromWaitQueue(PGPROC *proc) { LOCK *waitLock = proc->waitLock; + PROCLOCK *proclock = proc->waitProcLock; LOCKMODE lockmode = proc->waitLockMode; + LOCKMETHODID lockmethodid = LOCK_LOCKMETHOD(*waitLock); /* Make sure proc is waiting */ Assert(proc->links.next != INVALID_OFFSET); Assert(waitLock); Assert(waitLock->waitProcs.size > 0); + Assert(0 < lockmethodid && lockmethodid < NumLockMethods); /* Remove proc from lock's wait queue */ SHMQueueDelete(&(proc->links)); @@ -963,18 +1152,42 @@ RemoveFromWaitQueue(PROC *proc) waitLock->requested[lockmode]--; /* don't forget to clear waitMask bit if appropriate */ if (waitLock->granted[lockmode] == waitLock->requested[lockmode]) - waitLock->waitMask &= BITS_OFF[lockmode]; + waitLock->waitMask &= LOCKBIT_OFF(lockmode); /* Clean up the proc's own state */ proc->waitLock = NULL; - proc->waitHolder = NULL; + proc->waitProcLock = NULL; + + /* + * Delete the proclock immediately if it represents no already-held locks. + * This must happen now because if the owner of the lock decides to release + * it, and the requested/granted counts then go to zero, LockRelease + * expects there to be no remaining proclocks. + */ + if (proclock->holdMask == 0) + { + PROCLOCK_PRINT("RemoveFromWaitQueue: deleting proclock", proclock); + SHMQueueDelete(&proclock->lockLink); + SHMQueueDelete(&proclock->procLink); + proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid], + (void *) &(proclock->tag), + HASH_REMOVE, NULL); + if (!proclock) + elog(WARNING, "proclock table corrupted"); + } + + /* + * There should still be some requests for the lock ... else what were + * we waiting for? Therefore no need to delete the lock object. + */ + Assert(waitLock->nRequested > 0); /* See if any other waiters for the lock can be woken up now */ - ProcLockWakeup(GetLocksMethodTable(waitLock), waitLock); + ProcLockWakeup(LockMethods[lockmethodid], waitLock); } /* - * LockRelease -- look up 'locktag' in lock table 'lockmethod' and + * LockRelease -- look up 'locktag' in lock table 'lockmethodid' and * release one 'lockmode' lock on it. * * Side Effects: find any waiting processes that are now wakable, @@ -984,354 +1197,362 @@ RemoveFromWaitQueue(PROC *proc) * come along and request the lock.) */ bool -LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, +LockRelease(LOCKMETHODID lockmethodid, LOCKTAG *locktag, TransactionId xid, LOCKMODE lockmode) { + LOCALLOCKTAG localtag; + LOCALLOCK *locallock; LOCK *lock; + PROCLOCK *proclock; LWLockId masterLock; - bool found; - LOCKMETHODTABLE *lockMethodTable; - PROCLOCK *holder; - PROCLOCKTAG holdertag; - HTAB *holderTable; - bool wakeupNeeded = false; + LockMethod lockMethodTable; + bool wakeupNeeded; #ifdef LOCK_DEBUG - if (lockmethod == USER_LOCKMETHOD && Trace_userlocks) - elog(DEBUG, "LockRelease: user lock tag [%u] %d", locktag->objId.blkno, lockmode); + if (Trace_userlocks && lockmethodid == USER_LOCKMETHOD) + elog(LOG, "LockRelease: user lock [%u,%u] %s", + locktag->locktag_field1, locktag->locktag_field2, + lock_mode_names[lockmode]); #endif - /* ???????? This must be changed when short term locks will be used */ - locktag->lockmethod = lockmethod; + /* ugly */ + locktag->locktag_lockmethodid = lockmethodid; - Assert(lockmethod < NumLockMethods); - lockMethodTable = LockMethodTable[lockmethod]; + Assert(lockmethodid < NumLockMethods); + lockMethodTable = LockMethods[lockmethodid]; if (!lockMethodTable) { - elog(NOTICE, "lockMethodTable is null in LockRelease"); + elog(WARNING, "lockMethodTable is null in LockRelease"); return FALSE; } - masterLock = lockMethodTable->ctl->masterLock; - LWLockAcquire(masterLock, LW_EXCLUSIVE); - /* - * Find a lock with this tag + * Find the LOCALLOCK entry for this lock and lockmode */ - Assert(lockMethodTable->lockHash->hash == tag_hash); - lock = (LOCK *) hash_search(lockMethodTable->lockHash, (Pointer) locktag, - HASH_FIND, &found); + MemSet(&localtag, 0, sizeof(localtag)); /* must clear padding */ + localtag.lock = *locktag; + localtag.xid = xid; + localtag.mode = lockmode; + + locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash[lockmethodid], + (void *) &localtag, + HASH_FIND, NULL); /* * let the caller print its own error message, too. Do not - * elog(ERROR). + * ereport(ERROR). */ - if (!lock) - { - LWLockRelease(masterLock); - elog(NOTICE, "LockRelease: locktable corrupted"); - return FALSE; - } - - if (!found) + if (!locallock || locallock->nLocks <= 0) { - LWLockRelease(masterLock); - elog(NOTICE, "LockRelease: no such lock"); + elog(WARNING, "you don't own a lock of type %s", + lock_mode_names[lockmode]); return FALSE; } - LOCK_PRINT("LockRelease: found", lock, lockmode); /* - * Find the holder entry for this holder. + * Decrease the count for the resource owner. */ - MemSet(&holdertag, 0, sizeof(PROCLOCKTAG)); /* must clear padding, - * needed */ - holdertag.lock = MAKE_OFFSET(lock); - holdertag.proc = MAKE_OFFSET(MyProc); - TransactionIdStore(xid, &holdertag.xid); - - holderTable = lockMethodTable->holderHash; - holder = (PROCLOCK *) hash_search(holderTable, (Pointer) &holdertag, - HASH_FIND_SAVE, &found); - if (!holder || !found) { - LWLockRelease(masterLock); -#ifdef USER_LOCKS - if (!found && lockmethod == USER_LOCKMETHOD) - elog(NOTICE, "LockRelease: no lock with this tag"); + LOCALLOCKOWNER *lockOwners = locallock->lockOwners; + ResourceOwner owner; + int i; + + /* Session locks and user locks are not transactional */ + if (xid != InvalidTransactionId && + lockmethodid == DEFAULT_LOCKMETHOD) + owner = CurrentResourceOwner; else -#endif - elog(NOTICE, "LockRelease: holder table corrupted"); - return FALSE; + owner = NULL; + + for (i = locallock->numLockOwners - 1; i >= 0; i--) + { + if (lockOwners[i].owner == owner) + { + Assert(lockOwners[i].nLocks > 0); + if (--lockOwners[i].nLocks == 0) + { + /* compact out unused slot */ + locallock->numLockOwners--; + if (i < locallock->numLockOwners) + lockOwners[i] = lockOwners[locallock->numLockOwners]; + } + break; + } + } + if (i < 0) + { + /* don't release a lock belonging to another owner */ + elog(WARNING, "you don't own a lock of type %s", + lock_mode_names[lockmode]); + return FALSE; + } } - PROCLOCK_PRINT("LockRelease: found", holder); /* - * Check that we are actually holding a lock of the type we want to - * release. + * Decrease the total local count. If we're still holding the lock, + * we're done. */ - if (!(holder->holding[lockmode] > 0)) - { - PROCLOCK_PRINT("LockRelease: WRONGTYPE", holder); - Assert(holder->holding[lockmode] >= 0); - LWLockRelease(masterLock); - elog(NOTICE, "LockRelease: you don't own a lock of type %s", - lock_mode_names[lockmode]); - return FALSE; - } - Assert(holder->nHolding > 0); - Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0)); - Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0)); - Assert(lock->nGranted <= lock->nRequested); + locallock->nLocks--; + + if (locallock->nLocks > 0) + return TRUE; /* - * fix the general lock stats + * Otherwise we've got to mess with the shared lock table. */ - lock->nRequested--; - lock->requested[lockmode]--; - lock->nGranted--; - lock->granted[lockmode]--; + masterLock = lockMethodTable->masterLock; - if (lock->granted[lockmode] == 0) + LWLockAcquire(masterLock, LW_EXCLUSIVE); + + /* + * We don't need to re-find the lock or proclock, since we kept their + * addresses in the locallock table, and they couldn't have been + * removed while we were holding a lock on them. + */ + lock = locallock->lock; + LOCK_PRINT("LockRelease: found", lock, lockmode); + proclock = locallock->proclock; + PROCLOCK_PRINT("LockRelease: found", proclock); + + /* + * Double-check that we are actually holding a lock of the type we + * want to release. + */ + if (!(proclock->holdMask & LOCKBIT_ON(lockmode))) { - /* change the conflict mask. No more of this lock type. */ - lock->grantMask &= BITS_OFF[lockmode]; + PROCLOCK_PRINT("LockRelease: WRONGTYPE", proclock); + LWLockRelease(masterLock); + elog(WARNING, "you don't own a lock of type %s", + lock_mode_names[lockmode]); + RemoveLocalLock(locallock); + return FALSE; } - LOCK_PRINT("LockRelease: updated", lock, lockmode); - Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0)); - Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0)); - Assert(lock->nGranted <= lock->nRequested); + wakeupNeeded = UnGrantLock(lock, lockmode, proclock, lockMethodTable); /* - * We need only run ProcLockWakeup if the released lock conflicts with - * at least one of the lock types requested by waiter(s). Otherwise - * whatever conflict made them wait must still exist. NOTE: before - * MVCC, we could skip wakeup if lock->granted[lockmode] was still - * positive. But that's not true anymore, because the remaining - * granted locks might belong to some waiter, who could now be - * awakened because he doesn't conflict with his own locks. + * If this was my last hold on this lock, delete my entry in the + * proclock table. */ - if (lockMethodTable->ctl->conflictTab[lockmode] & lock->waitMask) - wakeupNeeded = true; - - if (lock->nRequested == 0) + if (proclock->holdMask == 0) { - - /* - * if there's no one waiting in the queue, we just released the - * last lock on this object. Delete it from the lock table. - */ - Assert(lockMethodTable->lockHash->hash == tag_hash); - lock = (LOCK *) hash_search(lockMethodTable->lockHash, - (Pointer) &(lock->tag), - HASH_REMOVE, - &found); - if (!lock || !found) + PROCLOCK_PRINT("LockRelease: deleting proclock", proclock); + SHMQueueDelete(&proclock->lockLink); + SHMQueueDelete(&proclock->procLink); + proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid], + (void *) &(proclock->tag), + HASH_REMOVE, NULL); + if (!proclock) { LWLockRelease(masterLock); - elog(NOTICE, "LockRelease: remove lock, table corrupted"); + elog(WARNING, "proclock table corrupted"); + RemoveLocalLock(locallock); return FALSE; } - wakeupNeeded = false; /* should be false, but make sure */ } - /* - * Now fix the per-holder lock stats. - */ - holder->holding[lockmode]--; - holder->nHolding--; - PROCLOCK_PRINT("LockRelease: updated", holder); - Assert((holder->nHolding >= 0) && (holder->holding[lockmode] >= 0)); - - /* - * If this was my last hold on this lock, delete my entry in the - * holder table. - */ - if (holder->nHolding == 0) + if (lock->nRequested == 0) { - PROCLOCK_PRINT("LockRelease: deleting", holder); - SHMQueueDelete(&holder->lockLink); - SHMQueueDelete(&holder->procLink); - holder = (PROCLOCK *) hash_search(holderTable, (Pointer) &holder, - HASH_REMOVE_SAVED, &found); - if (!holder || !found) + /* + * We've just released the last lock, so garbage-collect the lock + * object. + */ + LOCK_PRINT("LockRelease: deleting lock", lock, lockmode); + Assert(SHMQueueEmpty(&(lock->procLocks))); + lock = (LOCK *) hash_search(LockMethodLockHash[lockmethodid], + (void *) &(lock->tag), + HASH_REMOVE, NULL); + if (!lock) { LWLockRelease(masterLock); - elog(NOTICE, "LockRelease: remove holder, table corrupted"); + elog(WARNING, "lock table corrupted"); + RemoveLocalLock(locallock); return FALSE; } } - - /* - * Wake up waiters if needed. - */ - if (wakeupNeeded) - ProcLockWakeup(lockMethodTable, lock); + else + { + /* + * Wake up waiters if needed. + */ + if (wakeupNeeded) + ProcLockWakeup(lockMethodTable, lock); + } LWLockRelease(masterLock); + + RemoveLocalLock(locallock); return TRUE; } /* - * LockReleaseAll -- Release all locks in a process's lock list. + * LockReleaseAll -- Release all locks of the specified lock method that + * are held by the current process. * - * Well, not really *all* locks. + * Well, not necessarily *all* locks. The available behaviors are: * - * If 'allxids' is TRUE, all locks of the specified lock method are - * released, regardless of transaction affiliation. + * allxids == true: release all locks regardless of transaction + * affiliation. * - * If 'allxids' is FALSE, all locks of the specified lock method and - * specified XID are released. + * allxids == false: release all locks with Xid != 0 + * (zero is the Xid used for "session" locks). */ bool -LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc, - bool allxids, TransactionId xid) +LockReleaseAll(LOCKMETHODID lockmethodid, bool allxids) { - SHM_QUEUE *procHolders = &(proc->procHolders); - PROCLOCK *holder; - PROCLOCK *nextHolder; + HASH_SEQ_STATUS status; + SHM_QUEUE *procLocks = &(MyProc->procLocks); LWLockId masterLock; - LOCKMETHODTABLE *lockMethodTable; + LockMethod lockMethodTable; int i, numLockModes; + LOCALLOCK *locallock; + PROCLOCK *proclock; LOCK *lock; - bool found; #ifdef LOCK_DEBUG - if (lockmethod == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks) - elog(DEBUG, "LockReleaseAll: lockmethod=%d, pid=%d", - lockmethod, proc->pid); + if (lockmethodid == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks) + elog(LOG, "LockReleaseAll: lockmethod=%d", lockmethodid); #endif - Assert(lockmethod < NumLockMethods); - lockMethodTable = LockMethodTable[lockmethod]; + Assert(lockmethodid < NumLockMethods); + lockMethodTable = LockMethods[lockmethodid]; if (!lockMethodTable) { - elog(NOTICE, "LockReleaseAll: bad lockmethod %d", lockmethod); + elog(WARNING, "bad lock method: %d", lockmethodid); return FALSE; } - numLockModes = lockMethodTable->ctl->numLockModes; - masterLock = lockMethodTable->ctl->masterLock; + numLockModes = lockMethodTable->numLockModes; + masterLock = lockMethodTable->masterLock; + + /* + * First we run through the locallock table and get rid of unwanted + * entries, then we scan the process's proclocks and get rid of those. + * We do this separately because we may have multiple locallock + * entries pointing to the same proclock, and we daren't end up with + * any dangling pointers. + */ + hash_seq_init(&status, LockMethodLocalHash[lockmethodid]); + + while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL) + { + if (locallock->proclock == NULL || locallock->lock == NULL) + { + /* + * We must've run out of shared memory while trying to set up + * this lock. Just forget the local entry. + */ + Assert(locallock->nLocks == 0); + RemoveLocalLock(locallock); + continue; + } + + /* Ignore items that are not of the lockmethod to be removed */ + if (LOCALLOCK_LOCKMETHOD(*locallock) != lockmethodid) + continue; + + /* + * Ignore locks with Xid=0 unless we are asked to release all + * locks + */ + if (TransactionIdEquals(locallock->tag.xid, InvalidTransactionId) + && !allxids) + continue; + + RemoveLocalLock(locallock); + } LWLockAcquire(masterLock, LW_EXCLUSIVE); - holder = (PROCLOCK *) SHMQueueNext(procHolders, procHolders, - offsetof(PROCLOCK, procLink)); + proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks, + offsetof(PROCLOCK, procLink)); - while (holder) + while (proclock) { bool wakeupNeeded = false; + PROCLOCK *nextHolder; - /* Get link first, since we may unlink/delete this holder */ - nextHolder = (PROCLOCK *) SHMQueueNext(procHolders, &holder->procLink, - offsetof(PROCLOCK, procLink)); + /* Get link first, since we may unlink/delete this proclock */ + nextHolder = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->procLink, + offsetof(PROCLOCK, procLink)); - Assert(holder->tag.proc == MAKE_OFFSET(proc)); + Assert(proclock->tag.proc == MAKE_OFFSET(MyProc)); - lock = (LOCK *) MAKE_PTR(holder->tag.lock); + lock = (LOCK *) MAKE_PTR(proclock->tag.lock); /* Ignore items that are not of the lockmethod to be removed */ - if (LOCK_LOCKMETHOD(*lock) != lockmethod) + if (LOCK_LOCKMETHOD(*lock) != lockmethodid) goto next_item; - /* If not allxids, ignore items that are of the wrong xid */ - if (!allxids && !TransactionIdEquals(xid, holder->tag.xid)) + /* + * Ignore locks with Xid=0 unless we are asked to release all + * locks + */ + if (TransactionIdEquals(proclock->tag.xid, InvalidTransactionId) + && !allxids) goto next_item; - PROCLOCK_PRINT("LockReleaseAll", holder); + PROCLOCK_PRINT("LockReleaseAll", proclock); LOCK_PRINT("LockReleaseAll", lock, 0); Assert(lock->nRequested >= 0); Assert(lock->nGranted >= 0); Assert(lock->nGranted <= lock->nRequested); - Assert(holder->nHolding >= 0); - Assert(holder->nHolding <= lock->nRequested); + Assert((proclock->holdMask & ~lock->grantMask) == 0); /* * fix the general lock stats */ - if (lock->nRequested != holder->nHolding) + if (proclock->holdMask) { for (i = 1; i <= numLockModes; i++) { - Assert(holder->holding[i] >= 0); - if (holder->holding[i] > 0) - { - lock->requested[i] -= holder->holding[i]; - lock->granted[i] -= holder->holding[i]; - Assert(lock->requested[i] >= 0 && lock->granted[i] >= 0); - if (lock->granted[i] == 0) - lock->grantMask &= BITS_OFF[i]; - - /* - * Read comments in LockRelease - */ - if (!wakeupNeeded && - lockMethodTable->ctl->conflictTab[i] & lock->waitMask) - wakeupNeeded = true; - } - } - lock->nRequested -= holder->nHolding; - lock->nGranted -= holder->nHolding; - Assert((lock->nRequested >= 0) && (lock->nGranted >= 0)); - Assert(lock->nGranted <= lock->nRequested); - } - else - { - /* - * This holder accounts for all the requested locks on the - * object, so we can be lazy and just zero things out. - */ - lock->nRequested = 0; - lock->nGranted = 0; - /* Fix the lock status, just for next LOCK_PRINT message. */ - for (i = 1; i <= numLockModes; i++) - { - Assert(lock->requested[i] == lock->granted[i]); - lock->requested[i] = lock->granted[i] = 0; + if (proclock->holdMask & LOCKBIT_ON(i)) + wakeupNeeded |= UnGrantLock(lock, i, proclock, + lockMethodTable); } } + Assert((lock->nRequested >= 0) && (lock->nGranted >= 0)); + Assert(lock->nGranted <= lock->nRequested); LOCK_PRINT("LockReleaseAll: updated", lock, 0); - PROCLOCK_PRINT("LockReleaseAll: deleting", holder); + PROCLOCK_PRINT("LockReleaseAll: deleting", proclock); /* - * Remove the holder entry from the linked lists + * Remove the proclock entry from the linked lists */ - SHMQueueDelete(&holder->lockLink); - SHMQueueDelete(&holder->procLink); + SHMQueueDelete(&proclock->lockLink); + SHMQueueDelete(&proclock->procLink); /* - * remove the holder entry from the hashtable + * remove the proclock entry from the hashtable */ - holder = (PROCLOCK *) hash_search(lockMethodTable->holderHash, - (Pointer) holder, - HASH_REMOVE, - &found); - if (!holder || !found) + proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid], + (void *) &(proclock->tag), + HASH_REMOVE, + NULL); + if (!proclock) { LWLockRelease(masterLock); - elog(NOTICE, "LockReleaseAll: holder table corrupted"); + elog(WARNING, "proclock table corrupted"); return FALSE; } if (lock->nRequested == 0) { - /* * We've just released the last lock, so garbage-collect the * lock object. */ LOCK_PRINT("LockReleaseAll: deleting", lock, 0); - Assert(lockMethodTable->lockHash->hash == tag_hash); - lock = (LOCK *) hash_search(lockMethodTable->lockHash, - (Pointer) &(lock->tag), - HASH_REMOVE, &found); - if (!lock || !found) + Assert(SHMQueueEmpty(&(lock->procLocks))); + lock = (LOCK *) hash_search(LockMethodLockHash[lockmethodid], + (void *) &(lock->tag), + HASH_REMOVE, NULL); + if (!lock) { LWLockRelease(masterLock); - elog(NOTICE, "LockReleaseAll: cannot remove lock from HTAB"); + elog(WARNING, "lock table corrupted"); return FALSE; } } @@ -1339,41 +1560,165 @@ LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc, ProcLockWakeup(lockMethodTable, lock); next_item: - holder = nextHolder; + proclock = nextHolder; } LWLockRelease(masterLock); #ifdef LOCK_DEBUG - if (lockmethod == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks) - elog(DEBUG, "LockReleaseAll: done"); + if (lockmethodid == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks) + elog(LOG, "LockReleaseAll done"); #endif return TRUE; } +/* + * LockReleaseCurrentOwner + * Release all locks belonging to CurrentResourceOwner + * + * Only DEFAULT_LOCKMETHOD locks can belong to a resource owner. + */ +void +LockReleaseCurrentOwner(void) +{ + HASH_SEQ_STATUS status; + LOCALLOCK *locallock; + LOCALLOCKOWNER *lockOwners; + int i; + + hash_seq_init(&status, LockMethodLocalHash[DEFAULT_LOCKMETHOD]); + + while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL) + { + /* Ignore items that must be nontransactional */ + if (LOCALLOCK_LOCKMETHOD(*locallock) != DEFAULT_LOCKMETHOD) + continue; + if (TransactionIdEquals(locallock->tag.xid, InvalidTransactionId)) + continue; + + /* Scan to see if there are any locks belonging to current owner */ + lockOwners = locallock->lockOwners; + for (i = locallock->numLockOwners - 1; i >= 0; i--) + { + if (lockOwners[i].owner == CurrentResourceOwner) + { + Assert(lockOwners[i].nLocks > 0); + if (lockOwners[i].nLocks < locallock->nLocks) + { + /* + * We will still hold this lock after forgetting this + * ResourceOwner. + */ + locallock->nLocks -= lockOwners[i].nLocks; + /* compact out unused slot */ + locallock->numLockOwners--; + if (i < locallock->numLockOwners) + lockOwners[i] = lockOwners[locallock->numLockOwners]; + } + else + { + Assert(lockOwners[i].nLocks == locallock->nLocks); + /* We want to call LockRelease just once */ + lockOwners[i].nLocks = 1; + locallock->nLocks = 1; + if (!LockRelease(DEFAULT_LOCKMETHOD, + &locallock->tag.lock, + locallock->tag.xid, + locallock->tag.mode)) + elog(WARNING, "LockReleaseCurrentOwner: failed??"); + } + break; + } + } + } +} + +/* + * LockReassignCurrentOwner + * Reassign all locks belonging to CurrentResourceOwner to belong + * to its parent resource owner + */ +void +LockReassignCurrentOwner(void) +{ + ResourceOwner parent = ResourceOwnerGetParent(CurrentResourceOwner); + HASH_SEQ_STATUS status; + LOCALLOCK *locallock; + LOCALLOCKOWNER *lockOwners; + + Assert(parent != NULL); + + hash_seq_init(&status, LockMethodLocalHash[DEFAULT_LOCKMETHOD]); + + while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL) + { + int i; + int ic = -1; + int ip = -1; + + /* Ignore items that must be nontransactional */ + if (LOCALLOCK_LOCKMETHOD(*locallock) != DEFAULT_LOCKMETHOD) + continue; + if (TransactionIdEquals(locallock->tag.xid, InvalidTransactionId)) + continue; + + /* + * Scan to see if there are any locks belonging to current owner + * or its parent + */ + lockOwners = locallock->lockOwners; + for (i = locallock->numLockOwners - 1; i >= 0; i--) + { + if (lockOwners[i].owner == CurrentResourceOwner) + ic = i; + else if (lockOwners[i].owner == parent) + ip = i; + } + + if (ic < 0) + continue; /* no current locks */ + + if (ip < 0) + { + /* Parent has no slot, so just give it child's slot */ + lockOwners[ic].owner = parent; + } + else + { + /* Merge child's count with parent's */ + lockOwners[ip].nLocks += lockOwners[ic].nLocks; + /* compact out unused slot */ + locallock->numLockOwners--; + if (ic < locallock->numLockOwners) + lockOwners[ic] = lockOwners[locallock->numLockOwners]; + } + } +} + + +/* + * Estimate shared-memory space used for lock tables + */ int LockShmemSize(int maxBackends) { int size = 0; long max_table_size = NLOCKENTS(maxBackends); - size += MAXALIGN(sizeof(PROC_HDR)); /* ProcGlobal */ - size += maxBackends * MAXALIGN(sizeof(PROC)); /* each MyProc */ - size += MAX_LOCK_METHODS * MAXALIGN(sizeof(LOCKMETHODCTL)); /* each - * lockMethodTable->ctl */ + /* lock method headers */ + size += MAX_LOCK_METHODS * MAXALIGN(sizeof(LockMethodData)); /* lockHash table */ - size += hash_estimate_size(max_table_size, - SHMEM_LOCKTAB_KEYSIZE, - SHMEM_LOCKTAB_DATASIZE); + size += hash_estimate_size(max_table_size, sizeof(LOCK)); - /* holderHash table */ - size += hash_estimate_size(max_table_size, - SHMEM_PROCLOCKTAB_KEYSIZE, - SHMEM_PROCLOCKTAB_DATASIZE); + /* proclockHash table */ + size += hash_estimate_size(max_table_size, sizeof(PROCLOCK)); /* + * Note we count only one pair of hash tables, since the userlocks + * table actually overlays the main one. + * * Since the lockHash entry count above is only an estimate, add 10% * safety margin. */ @@ -1382,51 +1727,117 @@ LockShmemSize(int maxBackends) return size; } +/* + * GetLockStatusData - Return a summary of the lock manager's internal + * status, for use in a user-level reporting function. + * + * The return data consists of an array of PROCLOCK objects, with the + * associated PGPROC and LOCK objects for each. Note that multiple + * copies of the same PGPROC and/or LOCK objects are likely to appear. + * It is the caller's responsibility to match up duplicates if wanted. + * + * The design goal is to hold the LockMgrLock for as short a time as possible; + * thus, this function simply makes a copy of the necessary data and releases + * the lock, allowing the caller to contemplate and format the data for as + * long as it pleases. + */ +LockData * +GetLockStatusData(void) +{ + LockData *data; + HTAB *proclockTable; + PROCLOCK *proclock; + HASH_SEQ_STATUS seqstat; + int i; + + data = (LockData *) palloc(sizeof(LockData)); + + LWLockAcquire(LockMgrLock, LW_EXCLUSIVE); + + proclockTable = LockMethodProcLockHash[DEFAULT_LOCKMETHOD]; + + data->nelements = i = proclockTable->hctl->nentries; + + data->proclockaddrs = (SHMEM_OFFSET *) palloc(sizeof(SHMEM_OFFSET) * i); + data->proclocks = (PROCLOCK *) palloc(sizeof(PROCLOCK) * i); + data->procs = (PGPROC *) palloc(sizeof(PGPROC) * i); + data->locks = (LOCK *) palloc(sizeof(LOCK) * i); + + hash_seq_init(&seqstat, proclockTable); + + i = 0; + while ((proclock = hash_seq_search(&seqstat))) + { + PGPROC *proc = (PGPROC *) MAKE_PTR(proclock->tag.proc); + LOCK *lock = (LOCK *) MAKE_PTR(proclock->tag.lock); + + data->proclockaddrs[i] = MAKE_OFFSET(proclock); + memcpy(&(data->proclocks[i]), proclock, sizeof(PROCLOCK)); + memcpy(&(data->procs[i]), proc, sizeof(PGPROC)); + memcpy(&(data->locks[i]), lock, sizeof(LOCK)); + + i++; + } + + LWLockRelease(LockMgrLock); + + Assert(i == data->nelements); + + return data; +} + +/* Provide the textual name of any lock mode */ +const char * +GetLockmodeName(LOCKMODE mode) +{ + Assert(mode <= MAX_LOCKMODES); + return lock_mode_names[mode]; +} #ifdef LOCK_DEBUG /* - * Dump all locks in the proc->procHolders list. + * Dump all locks in the MyProc->procLocks list. * * Must have already acquired the masterLock. */ void DumpLocks(void) { - PROC *proc; - SHM_QUEUE *procHolders; - PROCLOCK *holder; + PGPROC *proc; + SHM_QUEUE *procLocks; + PROCLOCK *proclock; LOCK *lock; - int lockmethod = DEFAULT_LOCKMETHOD; - LOCKMETHODTABLE *lockMethodTable; + int lockmethodid = DEFAULT_LOCKMETHOD; + LockMethod lockMethodTable; proc = MyProc; if (proc == NULL) return; - procHolders = &proc->procHolders; + procLocks = &proc->procLocks; - Assert(lockmethod < NumLockMethods); - lockMethodTable = LockMethodTable[lockmethod]; + Assert(lockmethodid < NumLockMethods); + lockMethodTable = LockMethods[lockmethodid]; if (!lockMethodTable) return; if (proc->waitLock) LOCK_PRINT("DumpLocks: waiting on", proc->waitLock, 0); - holder = (PROCLOCK *) SHMQueueNext(procHolders, procHolders, - offsetof(PROCLOCK, procLink)); + proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks, + offsetof(PROCLOCK, procLink)); - while (holder) + while (proclock) { - Assert(holder->tag.proc == MAKE_OFFSET(proc)); + Assert(proclock->tag.proc == MAKE_OFFSET(proc)); - lock = (LOCK *) MAKE_PTR(holder->tag.lock); + lock = (LOCK *) MAKE_PTR(proclock->tag.lock); - PROCLOCK_PRINT("DumpLocks", holder); + PROCLOCK_PRINT("DumpLocks", proclock); LOCK_PRINT("DumpLocks", lock, 0); - holder = (PROCLOCK *) SHMQueueNext(procHolders, &holder->procLink, - offsetof(PROCLOCK, procLink)); + proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->procLink, + offsetof(PROCLOCK, procLink)); } } @@ -1436,42 +1847,41 @@ DumpLocks(void) void DumpAllLocks(void) { - PROC *proc; - PROCLOCK *holder = NULL; + PGPROC *proc; + PROCLOCK *proclock; LOCK *lock; - int lockmethod = DEFAULT_LOCKMETHOD; - LOCKMETHODTABLE *lockMethodTable; - HTAB *holderTable; + int lockmethodid = DEFAULT_LOCKMETHOD; + LockMethod lockMethodTable; + HTAB *proclockTable; HASH_SEQ_STATUS status; proc = MyProc; if (proc == NULL) return; - Assert(lockmethod < NumLockMethods); - lockMethodTable = LockMethodTable[lockmethod]; + Assert(lockmethodid < NumLockMethods); + lockMethodTable = LockMethods[lockmethodid]; if (!lockMethodTable) return; - holderTable = lockMethodTable->holderHash; + proclockTable = LockMethodProcLockHash[lockmethodid]; if (proc->waitLock) LOCK_PRINT("DumpAllLocks: waiting on", proc->waitLock, 0); - hash_seq_init(&status, holderTable); - while ((holder = (PROCLOCK *) hash_seq_search(&status)) && - (holder != (PROCLOCK *) TRUE)) + hash_seq_init(&status, proclockTable); + while ((proclock = (PROCLOCK *) hash_seq_search(&status)) != NULL) { - PROCLOCK_PRINT("DumpAllLocks", holder); + PROCLOCK_PRINT("DumpAllLocks", proclock); - if (holder->tag.lock) + if (proclock->tag.lock) { - lock = (LOCK *) MAKE_PTR(holder->tag.lock); + lock = (LOCK *) MAKE_PTR(proclock->tag.lock); LOCK_PRINT("DumpAllLocks", lock, 0); } else - elog(DEBUG, "DumpAllLocks: holder->tag.lock = NULL"); + elog(LOG, "DumpAllLocks: proclock->tag.lock = NULL"); } } -#endif /* LOCK_DEBUG */ +#endif /* LOCK_DEBUG */