X-Git-Url: https://granicus.if.org/sourcecode?a=blobdiff_plain;f=src%2Fbackend%2Fstorage%2Flmgr%2Flock.c;h=576fc205299eebe48047cab9f3dcdf5169ab35fe;hb=3a694bb0a16fea1662f1ffd31506a72effdd4a93;hp=8cb25b4ccdd16b8617921f0209a79f15981c11fb;hpb=d75b2ec4ebbc7fdb51088e89da47c6523bf2c640;p=postgresql diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c index 8cb25b4ccd..576fc20529 100644 --- a/src/backend/storage/lmgr/lock.c +++ b/src/backend/storage/lmgr/lock.c @@ -3,12 +3,12 @@ * lock.c * POSTGRES low-level lock mechanism * - * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group + * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/storage/lmgr/lock.c,v 1.131 2003/12/20 17:31:21 momjian Exp $ + * $PostgreSQL: pgsql/src/backend/storage/lmgr/lock.c,v 1.150 2005/04/29 22:28:24 tgl Exp $ * * NOTES * Outside modules can create a lock table and acquire/release @@ -23,21 +23,22 @@ * Interface: * * LockAcquire(), LockRelease(), LockMethodTableInit(), - * LockMethodTableRename(), LockReleaseAll, + * LockMethodTableRename(), LockReleaseAll(), * LockCheckConflicts(), GrantLock() * *------------------------------------------------------------------------- */ #include "postgres.h" -#include #include +#include #include "access/xact.h" #include "miscadmin.h" #include "storage/proc.h" #include "utils/memutils.h" #include "utils/ps_status.h" +#include "utils/resowner.h" /* This configuration variable is used to set the lock table size */ @@ -46,12 +47,24 @@ int max_locks_per_xact; /* set by guc.c */ #define NLOCKENTS(maxBackends) (max_locks_per_xact * (maxBackends)) -static int WaitOnLock(LOCKMETHODID lockmethodid, LOCKMODE lockmode, - LOCK *lock, PROCLOCK *proclock); -static void LockCountMyLocks(SHMEM_OFFSET lockOffset, PGPROC *proc, - int *myHolding); +/* + * map from lock method id to the lock table data structures + */ +static LockMethod LockMethods[MAX_LOCK_METHODS]; +static HTAB *LockMethodLockHash[MAX_LOCK_METHODS]; +static HTAB *LockMethodProcLockHash[MAX_LOCK_METHODS]; +static HTAB *LockMethodLocalHash[MAX_LOCK_METHODS]; -static char *lock_mode_names[] = +/* exported so lmgr.c can initialize it */ +int NumLockMethods; + + +/* private state for GrantAwaitedLock */ +static LOCALLOCK *awaitedLock; +static ResourceOwner awaitedOwner; + + +static const char *const lock_mode_names[] = { "INVALID", "AccessShareLock", @@ -84,7 +97,7 @@ static char *lock_mode_names[] = * -------- */ -int Trace_lock_oidmin = BootstrapObjectIdData; +int Trace_lock_oidmin = FirstNormalObjectId; bool Trace_locks = false; bool Trace_userlocks = false; int Trace_lock_table = 0; @@ -95,10 +108,11 @@ inline static bool LOCK_DEBUG_ENABLED(const LOCK *lock) { return - (((LOCK_LOCKMETHOD(*lock) == DEFAULT_LOCKMETHOD && Trace_locks) - || (LOCK_LOCKMETHOD(*lock) == USER_LOCKMETHOD && Trace_userlocks)) - && (lock->tag.relId >= (Oid) Trace_lock_oidmin)) - || (Trace_lock_table && (lock->tag.relId == Trace_lock_table)); + (((Trace_locks && LOCK_LOCKMETHOD(*lock) == DEFAULT_LOCKMETHOD) + || (Trace_userlocks && LOCK_LOCKMETHOD(*lock) == USER_LOCKMETHOD)) + && ((Oid) lock->tag.locktag_field2 >= (Oid) Trace_lock_oidmin)) + || (Trace_lock_table + && (lock->tag.locktag_field2 == Trace_lock_table)); } @@ -107,12 +121,14 @@ LOCK_PRINT(const char *where, const LOCK *lock, LOCKMODE type) { if (LOCK_DEBUG_ENABLED(lock)) elog(LOG, - "%s: lock(%lx) tbl(%d) rel(%u) db(%u) obj(%u) grantMask(%x) " + "%s: lock(%lx) id(%u,%u,%u,%u,%u,%u) grantMask(%x) " "req(%d,%d,%d,%d,%d,%d,%d)=%d " "grant(%d,%d,%d,%d,%d,%d,%d)=%d wait(%d) type(%s)", where, MAKE_OFFSET(lock), - lock->tag.lockmethodid, lock->tag.relId, lock->tag.dbId, - lock->tag.objId.blkno, lock->grantMask, + lock->tag.locktag_field1, lock->tag.locktag_field2, + lock->tag.locktag_field3, lock->tag.locktag_field4, + lock->tag.locktag_type, lock->tag.locktag_lockmethodid, + lock->grantMask, lock->requested[1], lock->requested[2], lock->requested[3], lock->requested[4], lock->requested[5], lock->requested[6], lock->requested[7], lock->nRequested, @@ -126,20 +142,13 @@ LOCK_PRINT(const char *where, const LOCK *lock, LOCKMODE type) inline static void PROCLOCK_PRINT(const char *where, const PROCLOCK *proclockP) { - if ( - (((PROCLOCK_LOCKMETHOD(*proclockP) == DEFAULT_LOCKMETHOD && Trace_locks) - || (PROCLOCK_LOCKMETHOD(*proclockP) == USER_LOCKMETHOD && Trace_userlocks)) - && (((LOCK *) MAKE_PTR(proclockP->tag.lock))->tag.relId >= (Oid) Trace_lock_oidmin)) - || (Trace_lock_table && (((LOCK *) MAKE_PTR(proclockP->tag.lock))->tag.relId == Trace_lock_table)) - ) + if (LOCK_DEBUG_ENABLED((LOCK *) MAKE_PTR(proclockP->tag.lock))) elog(LOG, - "%s: proclock(%lx) lock(%lx) tbl(%d) proc(%lx) xid(%u) hold(%d,%d,%d,%d,%d,%d,%d)=%d", + "%s: proclock(%lx) lock(%lx) method(%u) proc(%lx) xid(%u) hold(%x)", where, MAKE_OFFSET(proclockP), proclockP->tag.lock, PROCLOCK_LOCKMETHOD(*(proclockP)), proclockP->tag.proc, proclockP->tag.xid, - proclockP->holding[1], proclockP->holding[2], proclockP->holding[3], - proclockP->holding[4], proclockP->holding[5], proclockP->holding[6], - proclockP->holding[7], proclockP->nHolding); + (int) proclockP->holdMask); } #else /* not LOCK_DEBUG */ @@ -149,13 +158,14 @@ PROCLOCK_PRINT(const char *where, const PROCLOCK *proclockP) #endif /* not LOCK_DEBUG */ -/* - * map from lock method id to the lock table structure - */ -static LockMethod LockMethods[MAX_LOCK_METHODS]; -static HTAB* LockMethodLockHash[MAX_LOCK_METHODS]; -static HTAB* LockMethodProcLockHash[MAX_LOCK_METHODS]; -static int NumLockMethods; +static void RemoveLocalLock(LOCALLOCK *locallock); +static void GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner); +static int WaitOnLock(LOCKMETHODID lockmethodid, LOCALLOCK *locallock, + ResourceOwner owner); +static void LockCountMyLocks(SHMEM_OFFSET lockOffset, PGPROC *proc, + int *myHolding); +static bool UnGrantLock(LOCK *lock, LOCKMODE lockmode, + PROCLOCK *proclock, LockMethod lockMethodTable); /* @@ -175,7 +185,7 @@ InitLocks(void) LockMethod GetLocksMethodTable(LOCK *lock) { - LOCKMETHODID lockmethodid = LOCK_LOCKMETHOD(*lock); + LOCKMETHODID lockmethodid = LOCK_LOCKMETHOD(*lock); Assert(0 < lockmethodid && lockmethodid < NumLockMethods); return LockMethods[lockmethodid]; @@ -190,15 +200,15 @@ GetLocksMethodTable(LOCK *lock) */ static void LockMethodInit(LockMethod lockMethodTable, - LOCKMASK *conflictsP, + const LOCKMASK *conflictsP, int numModes) { int i; lockMethodTable->numLockModes = numModes; /* copies useless zero element as well as the N lockmodes */ - for (i = 0; i <= numModes; i++, conflictsP++) - lockMethodTable->conflictTab[i] = *conflictsP; + for (i = 0; i <= numModes; i++) + lockMethodTable->conflictTab[i] = conflictsP[i]; } /* @@ -211,12 +221,13 @@ LockMethodInit(LockMethod lockMethodTable, * TopMemoryContext. */ LOCKMETHODID -LockMethodTableInit(char *tabName, - LOCKMASK *conflictsP, +LockMethodTableInit(const char *tabName, + const LOCKMASK *conflictsP, int numModes, int maxBackends) { LockMethod newLockMethod; + LOCKMETHODID lockmethodid; char *shmemName; HASHCTL info; int hash_flags; @@ -226,11 +237,11 @@ LockMethodTableInit(char *tabName, if (numModes >= MAX_LOCKMODES) elog(ERROR, "too many lock types %d (limit is %d)", - numModes, MAX_LOCKMODES-1); + numModes, MAX_LOCKMODES - 1); /* Compute init/max size to request for lock hashtables */ max_table_size = NLOCKENTS(maxBackends); - init_table_size = max_table_size / 10; + init_table_size = max_table_size / 2; /* Allocate a string for the shmem index table lookups. */ /* This is just temp space in this routine, so palloc is OK. */ @@ -244,17 +255,6 @@ LockMethodTableInit(char *tabName, if (!newLockMethod) elog(FATAL, "could not initialize lock table \"%s\"", tabName); - /* - * Lock the LWLock for the table (probably not necessary here) - */ -#ifndef EXEC_BACKEND - LWLockAcquire(LockMgrLock, LW_EXCLUSIVE); -#endif - /* - * no zero-th table - */ - NumLockMethods = 1; - /* * we're first - initialize */ @@ -262,39 +262,39 @@ LockMethodTableInit(char *tabName, { MemSet(newLockMethod, 0, sizeof(LockMethodData)); newLockMethod->masterLock = LockMgrLock; - newLockMethod->lockmethodid = NumLockMethods; + LockMethodInit(newLockMethod, conflictsP, numModes); } /* * other modules refer to the lock table by a lockmethod ID */ - LockMethods[NumLockMethods] = newLockMethod; - NumLockMethods++; - Assert(NumLockMethods <= MAX_LOCK_METHODS); + Assert(NumLockMethods < MAX_LOCK_METHODS); + lockmethodid = NumLockMethods++; + LockMethods[lockmethodid] = newLockMethod; /* * allocate a hash table for LOCK structs. This is used to store * per-locked-object information. */ + MemSet(&info, 0, sizeof(info)); info.keysize = sizeof(LOCKTAG); info.entrysize = sizeof(LOCK); info.hash = tag_hash; hash_flags = (HASH_ELEM | HASH_FUNCTION); sprintf(shmemName, "%s (lock hash)", tabName); - LockMethodLockHash[NumLockMethods-1] = ShmemInitHash(shmemName, - init_table_size, - max_table_size, - &info, - hash_flags); + LockMethodLockHash[lockmethodid] = ShmemInitHash(shmemName, + init_table_size, + max_table_size, + &info, + hash_flags); - if (!LockMethodLockHash[NumLockMethods-1]) + if (!LockMethodLockHash[lockmethodid]) elog(FATAL, "could not initialize lock table \"%s\"", tabName); - Assert(LockMethodLockHash[NumLockMethods-1]->hash == tag_hash); /* * allocate a hash table for PROCLOCK structs. This is used to store - * per-lock-proclock information. + * per-lock-holder information. */ info.keysize = sizeof(PROCLOCKTAG); info.entrysize = sizeof(PROCLOCK); @@ -302,43 +302,57 @@ LockMethodTableInit(char *tabName, hash_flags = (HASH_ELEM | HASH_FUNCTION); sprintf(shmemName, "%s (proclock hash)", tabName); - LockMethodProcLockHash[NumLockMethods-1] = ShmemInitHash(shmemName, - init_table_size, - max_table_size, - &info, - hash_flags); + LockMethodProcLockHash[lockmethodid] = ShmemInitHash(shmemName, + init_table_size, + max_table_size, + &info, + hash_flags); - if (!LockMethodProcLockHash[NumLockMethods-1]) + if (!LockMethodProcLockHash[lockmethodid]) elog(FATAL, "could not initialize lock table \"%s\"", tabName); - /* init data structures */ - LockMethodInit(newLockMethod, conflictsP, numModes); + /* + * allocate a non-shared hash table for LOCALLOCK structs. This is + * used to store lock counts and resource owner information. + * + * The non-shared table could already exist in this process (this occurs + * when the postmaster is recreating shared memory after a backend + * crash). If so, delete and recreate it. (We could simply leave it, + * since it ought to be empty in the postmaster, but for safety let's + * zap it.) + */ + if (LockMethodLocalHash[lockmethodid]) + hash_destroy(LockMethodLocalHash[lockmethodid]); + + info.keysize = sizeof(LOCALLOCKTAG); + info.entrysize = sizeof(LOCALLOCK); + info.hash = tag_hash; + hash_flags = (HASH_ELEM | HASH_FUNCTION); + + sprintf(shmemName, "%s (locallock hash)", tabName); + LockMethodLocalHash[lockmethodid] = hash_create(shmemName, + 128, + &info, + hash_flags); -#ifndef EXEC_BACKEND - LWLockRelease(LockMgrLock); -#endif pfree(shmemName); - return newLockMethod->lockmethodid; + return lockmethodid; } /* * LockMethodTableRename -- allocate another lockmethod ID to the same * lock table. * - * NOTES: Both the lock module and the lock chain (lchain.c) - * module use table id's to distinguish between different - * kinds of locks. Short term and long term locks look - * the same to the lock table, but are handled differently - * by the lock chain manager. This function allows the - * client to use different lockmethods when acquiring/releasing - * short term and long term locks, yet store them all in one hashtable. + * NOTES: This function makes it possible to have different lockmethodids, + * and hence different locking semantics, while still storing all + * the data in one shared-memory hashtable. */ LOCKMETHODID LockMethodTableRename(LOCKMETHODID lockmethodid) { - LOCKMETHODID newLockMethodId; + LOCKMETHODID newLockMethodId; if (NumLockMethods >= MAX_LOCK_METHODS) return INVALID_LOCKMETHOD; @@ -350,6 +364,10 @@ LockMethodTableRename(LOCKMETHODID lockmethodid) NumLockMethods++; LockMethods[newLockMethodId] = LockMethods[lockmethodid]; + LockMethodLockHash[newLockMethodId] = LockMethodLockHash[lockmethodid]; + LockMethodProcLockHash[newLockMethodId] = LockMethodProcLockHash[lockmethodid]; + LockMethodLocalHash[newLockMethodId] = LockMethodLocalHash[lockmethodid]; + return newLockMethodId; } @@ -380,33 +398,16 @@ LockMethodTableRename(LOCKMETHODID lockmethodid) * the lock. While the lock is active other clients can still * read and write the tuple but they can be aware that it has * been locked at the application level by someone. - * User locks use lock tags made of an uint16 and an uint32, for - * example 0 and a tuple oid, or any other arbitrary pair of - * numbers following a convention established by the application. - * In this sense tags don't refer to tuples or database entities. + * * User locks and normal locks are completely orthogonal and - * they don't interfere with each other, so it is possible - * to acquire a normal lock on an user-locked tuple or user-lock - * a tuple for which a normal write lock already exists. + * they don't interfere with each other. + * * User locks are always non blocking, therefore they are never * acquired if already held by another process. They must be * released explicitly by the application but they are released * automatically when a backend terminates. * They are indicated by a lockmethod 2 which is an alias for the - * normal lock table, and are distinguished from normal locks - * by the following differences: - * - * normal lock user lock - * - * lockmethodid 1 2 - * tag.dbId database oid database oid - * tag.relId rel oid or 0 0 - * tag.objId block id lock id2 - * or xact id - * tag.offnum 0 lock id1 - * proclock.xid xid or 0 0 - * persistence transaction user or backend - * or backend + * normal lock table. * * The lockmode parameter can have the same values for normal locks * although probably only WRITE_LOCK can have some practical use. @@ -418,11 +419,13 @@ bool LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag, TransactionId xid, LOCKMODE lockmode, bool dontWait) { + LOCALLOCKTAG localtag; + LOCALLOCK *locallock; + LOCK *lock; PROCLOCK *proclock; PROCLOCKTAG proclocktag; - HTAB *proclockTable; bool found; - LOCK *lock; + ResourceOwner owner; LWLockId masterLock; LockMethod lockMethodTable; int status; @@ -430,13 +433,14 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag, int i; #ifdef LOCK_DEBUG - if (lockmethodid == USER_LOCKMETHOD && Trace_userlocks) - elog(LOG, "LockAcquire: user lock [%u] %s", - locktag->objId.blkno, lock_mode_names[lockmode]); + if (Trace_userlocks && lockmethodid == USER_LOCKMETHOD) + elog(LOG, "LockAcquire: user lock [%u,%u] %s", + locktag->locktag_field1, locktag->locktag_field2, + lock_mode_names[lockmode]); #endif - /* ???????? This must be changed when short term locks will be used */ - locktag->lockmethodid = lockmethodid; + /* ugly */ + locktag->locktag_lockmethodid = lockmethodid; Assert(lockmethodid < NumLockMethods); lockMethodTable = LockMethods[lockmethodid]; @@ -446,14 +450,83 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag, return FALSE; } + /* Session locks and user locks are not transactional */ + if (xid != InvalidTransactionId && + lockmethodid == DEFAULT_LOCKMETHOD) + owner = CurrentResourceOwner; + else + owner = NULL; + + /* + * Find or create a LOCALLOCK entry for this lock and lockmode + */ + MemSet(&localtag, 0, sizeof(localtag)); /* must clear padding */ + localtag.lock = *locktag; + localtag.xid = xid; + localtag.mode = lockmode; + + locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash[lockmethodid], + (void *) &localtag, + HASH_ENTER, &found); + if (!locallock) + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of memory"))); + + /* + * if it's a new locallock object, initialize it + */ + if (!found) + { + locallock->lock = NULL; + locallock->proclock = NULL; + locallock->nLocks = 0; + locallock->numLockOwners = 0; + locallock->maxLockOwners = 8; + locallock->lockOwners = NULL; + locallock->lockOwners = (LOCALLOCKOWNER *) + MemoryContextAlloc(TopMemoryContext, + locallock->maxLockOwners * sizeof(LOCALLOCKOWNER)); + } + else + { + /* Make sure there will be room to remember the lock */ + if (locallock->numLockOwners >= locallock->maxLockOwners) + { + int newsize = locallock->maxLockOwners * 2; + + locallock->lockOwners = (LOCALLOCKOWNER *) + repalloc(locallock->lockOwners, + newsize * sizeof(LOCALLOCKOWNER)); + locallock->maxLockOwners = newsize; + } + } + + /* + * If we already hold the lock, we can just increase the count + * locally. + */ + if (locallock->nLocks > 0) + { + GrantLockLocal(locallock, owner); + return TRUE; + } + + /* + * Otherwise we've got to mess with the shared lock table. + */ masterLock = lockMethodTable->masterLock; LWLockAcquire(masterLock, LW_EXCLUSIVE); /* - * Find or create a lock with this tag + * Find or create a lock with this tag. + * + * Note: if the locallock object already existed, it might have a pointer + * to the lock already ... but we probably should not assume that that + * pointer is valid, since a lock object with no locks can go away + * anytime. */ - Assert(LockMethodLockHash[lockmethodid]->hash == tag_hash); lock = (LOCK *) hash_search(LockMethodLockHash[lockmethodid], (void *) locktag, HASH_ENTER, &found); @@ -463,8 +536,9 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag, ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of shared memory"), - errhint("You may need to increase max_locks_per_transaction."))); + errhint("You may need to increase max_locks_per_transaction."))); } + locallock->lock = lock; /* * if it's a new lock object, initialize it @@ -473,7 +547,7 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag, { lock->grantMask = 0; lock->waitMask = 0; - SHMQueueInit(&(lock->lockHolders)); + SHMQueueInit(&(lock->procLocks)); ProcQueueInit(&(lock->waitProcs)); lock->nRequested = 0; lock->nGranted = 0; @@ -492,8 +566,7 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag, /* * Create the hash key for the proclock table. */ - MemSet(&proclocktag, 0, sizeof(PROCLOCKTAG)); /* must clear padding, - * needed */ + MemSet(&proclocktag, 0, sizeof(PROCLOCKTAG)); /* must clear padding */ proclocktag.lock = MAKE_OFFSET(lock); proclocktag.proc = MAKE_OFFSET(MyProc); TransactionIdStore(xid, &proclocktag.xid); @@ -501,36 +574,50 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag, /* * Find or create a proclock entry with this tag */ - proclockTable = LockMethodProcLockHash[lockmethodid]; - proclock = (PROCLOCK *) hash_search(proclockTable, + proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid], (void *) &proclocktag, HASH_ENTER, &found); if (!proclock) { + /* Ooops, not enough shmem for the proclock */ + if (lock->nRequested == 0) + { + /* + * There are no other requestors of this lock, so garbage-collect + * the lock object. We *must* do this to avoid a permanent leak + * of shared memory, because there won't be anything to cause + * anyone to release the lock object later. + */ + Assert(SHMQueueEmpty(&(lock->procLocks))); + lock = (LOCK *) hash_search(LockMethodLockHash[lockmethodid], + (void *) &(lock->tag), + HASH_REMOVE, NULL); + } LWLockRelease(masterLock); + if (!lock) /* hash remove failed? */ + elog(WARNING, "lock table corrupted"); ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), errmsg("out of shared memory"), - errhint("You may need to increase max_locks_per_transaction."))); + errhint("You may need to increase max_locks_per_transaction."))); } + locallock->proclock = proclock; /* * If new, initialize the new entry */ if (!found) { - proclock->nHolding = 0; - MemSet((char *) proclock->holding, 0, sizeof(int) * MAX_LOCKMODES); + proclock->holdMask = 0; /* Add proclock to appropriate lists */ - SHMQueueInsertBefore(&lock->lockHolders, &proclock->lockLink); - SHMQueueInsertBefore(&MyProc->procHolders, &proclock->procLink); + SHMQueueInsertBefore(&lock->procLocks, &proclock->lockLink); + SHMQueueInsertBefore(&MyProc->procLocks, &proclock->procLink); PROCLOCK_PRINT("LockAcquire: new", proclock); } else { PROCLOCK_PRINT("LockAcquire: found", proclock); - Assert((proclock->nHolding >= 0) && (proclock->holding[lockmode] >= 0)); - Assert(proclock->nHolding <= lock->nGranted); + Assert((proclock->holdMask & ~lock->grantMask) == 0); #ifdef CHECK_DEADLOCK_RISK @@ -551,7 +638,7 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag, */ for (i = lockMethodTable->numLockModes; i > 0; i--) { - if (proclock->holding[i] > 0) + if (proclock->holdMask & LOCKBIT_ON(i)) { if (i >= (int) lockmode) break; /* safe: we have a lock >= req level */ @@ -575,25 +662,14 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag, Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0)); /* - * If I already hold one or more locks of the requested type, just - * grant myself another one without blocking. - */ - if (proclock->holding[lockmode] > 0) - { - GrantLock(lock, proclock, lockmode); - PROCLOCK_PRINT("LockAcquire: owning", proclock); - LWLockRelease(masterLock); - return TRUE; - } - - /* - * If this process (under any XID) is a proclock of the lock, also - * grant myself another one without blocking. + * If this process (under any XID) is a holder of the lock, just grant + * myself another one without blocking. */ LockCountMyLocks(proclock->tag.lock, MyProc, myHolding); if (myHolding[lockmode] > 0) { GrantLock(lock, proclock, lockmode); + GrantLockLocal(locallock, owner); PROCLOCK_PRINT("LockAcquire: my other XID owning", proclock); LWLockRelease(masterLock); return TRUE; @@ -615,6 +691,7 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag, { /* No conflict with held or previously requested locks */ GrantLock(lock, proclock, lockmode); + GrantLockLocal(locallock, owner); } else { @@ -622,29 +699,31 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag, /* * We can't acquire the lock immediately. If caller specified no - * blocking, remove the proclock entry and return FALSE without + * blocking, remove useless table entries and return FALSE without * waiting. */ if (dontWait) { - if (proclock->nHolding == 0) + if (proclock->holdMask == 0) { SHMQueueDelete(&proclock->lockLink); SHMQueueDelete(&proclock->procLink); - proclock = (PROCLOCK *) hash_search(proclockTable, - (void *) proclock, + proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid], + (void *) &(proclock->tag), HASH_REMOVE, NULL); if (!proclock) elog(WARNING, "proclock table corrupted"); } else - PROCLOCK_PRINT("LockAcquire: NHOLDING", proclock); + PROCLOCK_PRINT("LockAcquire: NOWAIT", proclock); lock->nRequested--; lock->requested[lockmode]--; LOCK_PRINT("LockAcquire: conditional lock failed", lock, lockmode); Assert((lock->nRequested > 0) && (lock->requested[lockmode] >= 0)); Assert(lock->nGranted <= lock->nRequested); LWLockRelease(masterLock); + if (locallock->nLocks == 0) + RemoveLocalLock(locallock); return FALSE; } @@ -652,7 +731,7 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag, * Construct bitmask of locks this process holds on this object. */ { - LOCKMASK heldLocks = 0; + LOCKMASK heldLocks = 0; for (i = 1; i <= lockMethodTable->numLockModes; i++) { @@ -665,7 +744,7 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag, /* * Sleep till someone wakes me up. */ - status = WaitOnLock(lockmethodid, lockmode, lock, proclock); + status = WaitOnLock(lockmethodid, locallock, owner); /* * NOTE: do not do any material change of state between here and @@ -678,7 +757,7 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag, * Check the proclock entry status, in case something in the ipc * communication doesn't work correctly. */ - if (!((proclock->nHolding > 0) && (proclock->holding[lockmode] > 0))) + if (!(proclock->holdMask & LOCKBIT_ON(lockmode))) { PROCLOCK_PRINT("LockAcquire: INCONSISTENT", proclock); LOCK_PRINT("LockAcquire: INCONSISTENT", lock, lockmode); @@ -695,6 +774,23 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag, return status == STATUS_OK; } +/* + * Subroutine to free a locallock entry + */ +static void +RemoveLocalLock(LOCALLOCK *locallock) +{ + LOCKMETHODID lockmethodid = LOCALLOCK_LOCKMETHOD(*locallock); + + pfree(locallock->lockOwners); + locallock->lockOwners = NULL; + locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash[lockmethodid], + (void *) &(locallock->tag), + HASH_REMOVE, NULL); + if (!locallock) + elog(WARNING, "locallock table corrupted"); +} + /* * LockCheckConflicts -- test whether requested lock conflicts * with those already granted @@ -789,24 +885,29 @@ LockCheckConflicts(LockMethod lockMethodTable, static void LockCountMyLocks(SHMEM_OFFSET lockOffset, PGPROC *proc, int *myHolding) { - SHM_QUEUE *procHolders = &(proc->procHolders); + SHM_QUEUE *procLocks = &(proc->procLocks); PROCLOCK *proclock; - int i; MemSet(myHolding, 0, MAX_LOCKMODES * sizeof(int)); - proclock = (PROCLOCK *) SHMQueueNext(procHolders, procHolders, + proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks, offsetof(PROCLOCK, procLink)); while (proclock) { if (lockOffset == proclock->tag.lock) { + LOCKMASK holdMask = proclock->holdMask; + int i; + for (i = 1; i < MAX_LOCKMODES; i++) - myHolding[i] += proclock->holding[i]; + { + if (holdMask & LOCKBIT_ON(i)) + myHolding[i]++; + } } - proclock = (PROCLOCK *) SHMQueueNext(procHolders, &proclock->procLink, + proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->procLink, offsetof(PROCLOCK, procLink)); } } @@ -816,7 +917,11 @@ LockCountMyLocks(SHMEM_OFFSET lockOffset, PGPROC *proc, int *myHolding) * the lock request has been granted. * * NOTE: if proc was blocked, it also needs to be removed from the wait list - * and have its waitLock/waitHolder fields cleared. That's not done here. + * and have its waitLock/waitProcLock fields cleared. That's not done here. + * + * NOTE: the lock grant also has to be recorded in the associated LOCALLOCK + * table entry; but since we may be awaking some other process, we can't do + * that here; it's done by GrantLockLocal, instead. */ void GrantLock(LOCK *lock, PROCLOCK *proclock, LOCKMODE lockmode) @@ -826,12 +931,112 @@ GrantLock(LOCK *lock, PROCLOCK *proclock, LOCKMODE lockmode) lock->grantMask |= LOCKBIT_ON(lockmode); if (lock->granted[lockmode] == lock->requested[lockmode]) lock->waitMask &= LOCKBIT_OFF(lockmode); + proclock->holdMask |= LOCKBIT_ON(lockmode); LOCK_PRINT("GrantLock", lock, lockmode); Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0)); Assert(lock->nGranted <= lock->nRequested); - proclock->holding[lockmode]++; - proclock->nHolding++; - Assert((proclock->nHolding > 0) && (proclock->holding[lockmode] > 0)); +} + +/* + * UnGrantLock -- opposite of GrantLock. + * + * Updates the lock and proclock data structures to show that the lock + * is no longer held nor requested by the current holder. + * + * Returns true if there were any waiters waiting on the lock that + * should now be woken up with ProcLockWakeup. + */ +static bool +UnGrantLock(LOCK *lock, LOCKMODE lockmode, + PROCLOCK *proclock, LockMethod lockMethodTable) +{ + bool wakeupNeeded = false; + + Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0)); + Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0)); + Assert(lock->nGranted <= lock->nRequested); + + /* + * fix the general lock stats + */ + lock->nRequested--; + lock->requested[lockmode]--; + lock->nGranted--; + lock->granted[lockmode]--; + + if (lock->granted[lockmode] == 0) + { + /* change the conflict mask. No more of this lock type. */ + lock->grantMask &= LOCKBIT_OFF(lockmode); + } + + LOCK_PRINT("UnGrantLock: updated", lock, lockmode); + + /* + * We need only run ProcLockWakeup if the released lock conflicts with + * at least one of the lock types requested by waiter(s). Otherwise + * whatever conflict made them wait must still exist. NOTE: before + * MVCC, we could skip wakeup if lock->granted[lockmode] was still + * positive. But that's not true anymore, because the remaining + * granted locks might belong to some waiter, who could now be + * awakened because he doesn't conflict with his own locks. + */ + if (lockMethodTable->conflictTab[lockmode] & lock->waitMask) + wakeupNeeded = true; + + /* + * Now fix the per-proclock state. + */ + proclock->holdMask &= LOCKBIT_OFF(lockmode); + PROCLOCK_PRINT("UnGrantLock: updated", proclock); + + return wakeupNeeded; +} + +/* + * GrantLockLocal -- update the locallock data structures to show + * the lock request has been granted. + * + * We expect that LockAcquire made sure there is room to add a new + * ResourceOwner entry. + */ +static void +GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner) +{ + LOCALLOCKOWNER *lockOwners = locallock->lockOwners; + int i; + + Assert(locallock->numLockOwners < locallock->maxLockOwners); + /* Count the total */ + locallock->nLocks++; + /* Count the per-owner lock */ + for (i = 0; i < locallock->numLockOwners; i++) + { + if (lockOwners[i].owner == owner) + { + lockOwners[i].nLocks++; + return; + } + } + lockOwners[i].owner = owner; + lockOwners[i].nLocks = 1; + locallock->numLockOwners++; +} + +/* + * GrantAwaitedLock -- call GrantLockLocal for the lock we are doing + * WaitOnLock on. + * + * proc.c needs this for the case where we are booted off the lock by + * timeout, but discover that someone granted us the lock anyway. + * + * We could just export GrantLockLocal, but that would require including + * resowner.h in lock.h, which creates circularity. + */ +void +GrantAwaitedLock(void) +{ + GrantLockLocal(awaitedLock, awaitedOwner); } /* @@ -843,23 +1048,29 @@ GrantLock(LOCK *lock, PROCLOCK *proclock, LOCKMODE lockmode) * The locktable's masterLock must be held at entry. */ static int -WaitOnLock(LOCKMETHODID lockmethodid, LOCKMODE lockmode, - LOCK *lock, PROCLOCK *proclock) +WaitOnLock(LOCKMETHODID lockmethodid, LOCALLOCK *locallock, + ResourceOwner owner) { LockMethod lockMethodTable = LockMethods[lockmethodid]; char *new_status, *old_status; + size_t len; Assert(lockmethodid < NumLockMethods); - LOCK_PRINT("WaitOnLock: sleeping on lock", lock, lockmode); + LOCK_PRINT("WaitOnLock: sleeping on lock", + locallock->lock, locallock->tag.mode); old_status = pstrdup(get_ps_display()); - new_status = (char *) palloc(strlen(old_status) + 10); - strcpy(new_status, old_status); - strcat(new_status, " waiting"); + len = strlen(old_status); + new_status = (char *) palloc(len + 8 + 1); + memcpy(new_status, old_status, len); + strcpy(new_status + len, " waiting"); set_ps_display(new_status); + awaitedLock = locallock; + awaitedOwner = owner; + /* * NOTE: Think not to put any shared-state cleanup after the call to * ProcSleep, in either the normal or failure path. The lock state @@ -875,16 +1086,17 @@ WaitOnLock(LOCKMETHODID lockmethodid, LOCKMODE lockmode, */ if (ProcSleep(lockMethodTable, - lockmode, - lock, - proclock) != STATUS_OK) + locallock->tag.mode, + locallock->lock, + locallock->proclock) != STATUS_OK) { /* * We failed as a result of a deadlock, see CheckDeadLock(). Quit - * now. Removal of the proclock and lock objects, if no longer - * needed, will happen in xact cleanup (see above for motivation). + * now. */ - LOCK_PRINT("WaitOnLock: aborting on lock", lock, lockmode); + awaitedLock = NULL; + LOCK_PRINT("WaitOnLock: aborting on lock", + locallock->lock, locallock->tag.mode); LWLockRelease(lockMethodTable->masterLock); /* @@ -895,11 +1107,14 @@ WaitOnLock(LOCKMETHODID lockmethodid, LOCKMODE lockmode, /* not reached */ } + awaitedLock = NULL; + set_ps_display(old_status); pfree(old_status); pfree(new_status); - LOCK_PRINT("WaitOnLock: wakeup on lock", lock, lockmode); + LOCK_PRINT("WaitOnLock: wakeup on lock", + locallock->lock, locallock->tag.mode); return STATUS_OK; } @@ -909,22 +1124,21 @@ WaitOnLock(LOCKMETHODID lockmethodid, LOCKMODE lockmode, * * Locktable lock must be held by caller. * - * NB: this does not remove the process' proclock object, nor the lock object, - * even though their counts might now have gone to zero. That will happen - * during a subsequent LockReleaseAll call, which we expect will happen - * during transaction cleanup. (Removal of a proc from its wait queue by - * this routine can only happen if we are aborting the transaction.) + * NB: this does not clean up any locallock object that may exist for the lock. */ void RemoveFromWaitQueue(PGPROC *proc) { LOCK *waitLock = proc->waitLock; + PROCLOCK *proclock = proc->waitProcLock; LOCKMODE lockmode = proc->waitLockMode; + LOCKMETHODID lockmethodid = LOCK_LOCKMETHOD(*waitLock); /* Make sure proc is waiting */ Assert(proc->links.next != INVALID_OFFSET); Assert(waitLock); Assert(waitLock->waitProcs.size > 0); + Assert(0 < lockmethodid && lockmethodid < NumLockMethods); /* Remove proc from lock's wait queue */ SHMQueueDelete(&(proc->links)); @@ -942,10 +1156,34 @@ RemoveFromWaitQueue(PGPROC *proc) /* Clean up the proc's own state */ proc->waitLock = NULL; - proc->waitHolder = NULL; + proc->waitProcLock = NULL; + + /* + * Delete the proclock immediately if it represents no already-held locks. + * This must happen now because if the owner of the lock decides to release + * it, and the requested/granted counts then go to zero, LockRelease + * expects there to be no remaining proclocks. + */ + if (proclock->holdMask == 0) + { + PROCLOCK_PRINT("RemoveFromWaitQueue: deleting proclock", proclock); + SHMQueueDelete(&proclock->lockLink); + SHMQueueDelete(&proclock->procLink); + proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid], + (void *) &(proclock->tag), + HASH_REMOVE, NULL); + if (!proclock) + elog(WARNING, "proclock table corrupted"); + } + + /* + * There should still be some requests for the lock ... else what were + * we waiting for? Therefore no need to delete the lock object. + */ + Assert(waitLock->nRequested > 0); /* See if any other waiters for the lock can be woken up now */ - ProcLockWakeup(GetLocksMethodTable(waitLock), waitLock); + ProcLockWakeup(LockMethods[lockmethodid], waitLock); } /* @@ -962,21 +1200,23 @@ bool LockRelease(LOCKMETHODID lockmethodid, LOCKTAG *locktag, TransactionId xid, LOCKMODE lockmode) { + LOCALLOCKTAG localtag; + LOCALLOCK *locallock; LOCK *lock; + PROCLOCK *proclock; LWLockId masterLock; LockMethod lockMethodTable; - PROCLOCK *proclock; - PROCLOCKTAG proclocktag; - HTAB *proclockTable; - bool wakeupNeeded = false; + bool wakeupNeeded; #ifdef LOCK_DEBUG - if (lockmethodid == USER_LOCKMETHOD && Trace_userlocks) - elog(LOG, "LockRelease: user lock tag [%u] %d", locktag->objId.blkno, lockmode); + if (Trace_userlocks && lockmethodid == USER_LOCKMETHOD) + elog(LOG, "LockRelease: user lock [%u,%u] %s", + locktag->locktag_field1, locktag->locktag_field2, + lock_mode_names[lockmode]); #endif - /* ???????? This must be changed when short term locks will be used */ - locktag->lockmethodid = lockmethodid; + /* ugly */ + locktag->locktag_lockmethodid = lockmethodid; Assert(lockmethodid < NumLockMethods); lockMethodTable = LockMethods[lockmethodid]; @@ -986,190 +1226,193 @@ LockRelease(LOCKMETHODID lockmethodid, LOCKTAG *locktag, return FALSE; } - masterLock = lockMethodTable->masterLock; - LWLockAcquire(masterLock, LW_EXCLUSIVE); - /* - * Find a lock with this tag + * Find the LOCALLOCK entry for this lock and lockmode */ - Assert(LockMethodLockHash[lockmethodid]->hash == tag_hash); - lock = (LOCK *) hash_search(LockMethodLockHash[lockmethodid], - (void *) locktag, - HASH_FIND, NULL); + MemSet(&localtag, 0, sizeof(localtag)); /* must clear padding */ + localtag.lock = *locktag; + localtag.xid = xid; + localtag.mode = lockmode; + + locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash[lockmethodid], + (void *) &localtag, + HASH_FIND, NULL); /* * let the caller print its own error message, too. Do not * ereport(ERROR). */ - if (!lock) + if (!locallock || locallock->nLocks <= 0) { - LWLockRelease(masterLock); - elog(WARNING, "no such lock"); + elog(WARNING, "you don't own a lock of type %s", + lock_mode_names[lockmode]); return FALSE; } - LOCK_PRINT("LockRelease: found", lock, lockmode); /* - * Find the proclock entry for this proclock. + * Decrease the count for the resource owner. */ - MemSet(&proclocktag, 0, sizeof(PROCLOCKTAG)); /* must clear padding, - * needed */ - proclocktag.lock = MAKE_OFFSET(lock); - proclocktag.proc = MAKE_OFFSET(MyProc); - TransactionIdStore(xid, &proclocktag.xid); - - proclockTable = LockMethodProcLockHash[lockmethodid]; - proclock = (PROCLOCK *) hash_search(proclockTable, - (void *) &proclocktag, - HASH_FIND_SAVE, NULL); - if (!proclock) { - LWLockRelease(masterLock); -#ifdef USER_LOCKS - if (lockmethodid == USER_LOCKMETHOD) - elog(WARNING, "no lock with this tag"); + LOCALLOCKOWNER *lockOwners = locallock->lockOwners; + ResourceOwner owner; + int i; + + /* Session locks and user locks are not transactional */ + if (xid != InvalidTransactionId && + lockmethodid == DEFAULT_LOCKMETHOD) + owner = CurrentResourceOwner; else -#endif - elog(WARNING, "proclock table corrupted"); - return FALSE; + owner = NULL; + + for (i = locallock->numLockOwners - 1; i >= 0; i--) + { + if (lockOwners[i].owner == owner) + { + Assert(lockOwners[i].nLocks > 0); + if (--lockOwners[i].nLocks == 0) + { + /* compact out unused slot */ + locallock->numLockOwners--; + if (i < locallock->numLockOwners) + lockOwners[i] = lockOwners[locallock->numLockOwners]; + } + break; + } + } + if (i < 0) + { + /* don't release a lock belonging to another owner */ + elog(WARNING, "you don't own a lock of type %s", + lock_mode_names[lockmode]); + return FALSE; + } } + + /* + * Decrease the total local count. If we're still holding the lock, + * we're done. + */ + locallock->nLocks--; + + if (locallock->nLocks > 0) + return TRUE; + + /* + * Otherwise we've got to mess with the shared lock table. + */ + masterLock = lockMethodTable->masterLock; + + LWLockAcquire(masterLock, LW_EXCLUSIVE); + + /* + * We don't need to re-find the lock or proclock, since we kept their + * addresses in the locallock table, and they couldn't have been + * removed while we were holding a lock on them. + */ + lock = locallock->lock; + LOCK_PRINT("LockRelease: found", lock, lockmode); + proclock = locallock->proclock; PROCLOCK_PRINT("LockRelease: found", proclock); /* - * Check that we are actually holding a lock of the type we want to - * release. + * Double-check that we are actually holding a lock of the type we + * want to release. */ - if (!(proclock->holding[lockmode] > 0)) + if (!(proclock->holdMask & LOCKBIT_ON(lockmode))) { PROCLOCK_PRINT("LockRelease: WRONGTYPE", proclock); - Assert(proclock->holding[lockmode] >= 0); LWLockRelease(masterLock); elog(WARNING, "you don't own a lock of type %s", lock_mode_names[lockmode]); + RemoveLocalLock(locallock); return FALSE; } - Assert(proclock->nHolding > 0); - Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0)); - Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0)); - Assert(lock->nGranted <= lock->nRequested); + + wakeupNeeded = UnGrantLock(lock, lockmode, proclock, lockMethodTable); /* - * fix the general lock stats + * If this was my last hold on this lock, delete my entry in the + * proclock table. */ - lock->nRequested--; - lock->requested[lockmode]--; - lock->nGranted--; - lock->granted[lockmode]--; - - if (lock->granted[lockmode] == 0) + if (proclock->holdMask == 0) { - /* change the conflict mask. No more of this lock type. */ - lock->grantMask &= LOCKBIT_OFF(lockmode); + PROCLOCK_PRINT("LockRelease: deleting proclock", proclock); + SHMQueueDelete(&proclock->lockLink); + SHMQueueDelete(&proclock->procLink); + proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid], + (void *) &(proclock->tag), + HASH_REMOVE, NULL); + if (!proclock) + { + LWLockRelease(masterLock); + elog(WARNING, "proclock table corrupted"); + RemoveLocalLock(locallock); + return FALSE; + } } - LOCK_PRINT("LockRelease: updated", lock, lockmode); - Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0)); - Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0)); - Assert(lock->nGranted <= lock->nRequested); - - /* - * We need only run ProcLockWakeup if the released lock conflicts with - * at least one of the lock types requested by waiter(s). Otherwise - * whatever conflict made them wait must still exist. NOTE: before - * MVCC, we could skip wakeup if lock->granted[lockmode] was still - * positive. But that's not true anymore, because the remaining - * granted locks might belong to some waiter, who could now be - * awakened because he doesn't conflict with his own locks. - */ - if (lockMethodTable->conflictTab[lockmode] & lock->waitMask) - wakeupNeeded = true; - if (lock->nRequested == 0) { /* - * if there's no one waiting in the queue, we just released the - * last lock on this object. Delete it from the lock table. + * We've just released the last lock, so garbage-collect the lock + * object. */ - Assert(LockMethodLockHash[lockmethodid]->hash == tag_hash); + LOCK_PRINT("LockRelease: deleting lock", lock, lockmode); + Assert(SHMQueueEmpty(&(lock->procLocks))); lock = (LOCK *) hash_search(LockMethodLockHash[lockmethodid], (void *) &(lock->tag), - HASH_REMOVE, - NULL); + HASH_REMOVE, NULL); if (!lock) { LWLockRelease(masterLock); elog(WARNING, "lock table corrupted"); + RemoveLocalLock(locallock); return FALSE; } - wakeupNeeded = false; /* should be false, but make sure */ } - - /* - * Now fix the per-proclock lock stats. - */ - proclock->holding[lockmode]--; - proclock->nHolding--; - PROCLOCK_PRINT("LockRelease: updated", proclock); - Assert((proclock->nHolding >= 0) && (proclock->holding[lockmode] >= 0)); - - /* - * If this was my last hold on this lock, delete my entry in the - * proclock table. - */ - if (proclock->nHolding == 0) + else { - PROCLOCK_PRINT("LockRelease: deleting", proclock); - SHMQueueDelete(&proclock->lockLink); - SHMQueueDelete(&proclock->procLink); - proclock = (PROCLOCK *) hash_search(proclockTable, - (void *) &proclock, - HASH_REMOVE_SAVED, NULL); - if (!proclock) - { - LWLockRelease(masterLock); - elog(WARNING, "proclock table corrupted"); - return FALSE; - } + /* + * Wake up waiters if needed. + */ + if (wakeupNeeded) + ProcLockWakeup(lockMethodTable, lock); } - /* - * Wake up waiters if needed. - */ - if (wakeupNeeded) - ProcLockWakeup(lockMethodTable, lock); - LWLockRelease(masterLock); + + RemoveLocalLock(locallock); return TRUE; } /* - * LockReleaseAll -- Release all locks in a process's lock list. + * LockReleaseAll -- Release all locks of the specified lock method that + * are held by the current process. * - * Well, not really *all* locks. + * Well, not necessarily *all* locks. The available behaviors are: * - * If 'allxids' is TRUE, all locks of the specified lock method are - * released, regardless of transaction affiliation. + * allxids == true: release all locks regardless of transaction + * affiliation. * - * If 'allxids' is FALSE, all locks of the specified lock method and - * specified XID are released. + * allxids == false: release all locks with Xid != 0 + * (zero is the Xid used for "session" locks). */ bool -LockReleaseAll(LOCKMETHODID lockmethodid, PGPROC *proc, - bool allxids, TransactionId xid) +LockReleaseAll(LOCKMETHODID lockmethodid, bool allxids) { - SHM_QUEUE *procHolders = &(proc->procHolders); - PROCLOCK *proclock; - PROCLOCK *nextHolder; + HASH_SEQ_STATUS status; + SHM_QUEUE *procLocks = &(MyProc->procLocks); LWLockId masterLock; LockMethod lockMethodTable; int i, numLockModes; + LOCALLOCK *locallock; + PROCLOCK *proclock; LOCK *lock; #ifdef LOCK_DEBUG if (lockmethodid == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks) - elog(LOG, "LockReleaseAll: lockmethod=%d, pid=%d", - lockmethodid, proc->pid); + elog(LOG, "LockReleaseAll: lockmethod=%d", lockmethodid); #endif Assert(lockmethodid < NumLockMethods); @@ -1183,20 +1426,58 @@ LockReleaseAll(LOCKMETHODID lockmethodid, PGPROC *proc, numLockModes = lockMethodTable->numLockModes; masterLock = lockMethodTable->masterLock; + /* + * First we run through the locallock table and get rid of unwanted + * entries, then we scan the process's proclocks and get rid of those. + * We do this separately because we may have multiple locallock + * entries pointing to the same proclock, and we daren't end up with + * any dangling pointers. + */ + hash_seq_init(&status, LockMethodLocalHash[lockmethodid]); + + while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL) + { + if (locallock->proclock == NULL || locallock->lock == NULL) + { + /* + * We must've run out of shared memory while trying to set up + * this lock. Just forget the local entry. + */ + Assert(locallock->nLocks == 0); + RemoveLocalLock(locallock); + continue; + } + + /* Ignore items that are not of the lockmethod to be removed */ + if (LOCALLOCK_LOCKMETHOD(*locallock) != lockmethodid) + continue; + + /* + * Ignore locks with Xid=0 unless we are asked to release all + * locks + */ + if (TransactionIdEquals(locallock->tag.xid, InvalidTransactionId) + && !allxids) + continue; + + RemoveLocalLock(locallock); + } + LWLockAcquire(masterLock, LW_EXCLUSIVE); - proclock = (PROCLOCK *) SHMQueueNext(procHolders, procHolders, + proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks, offsetof(PROCLOCK, procLink)); while (proclock) { bool wakeupNeeded = false; + PROCLOCK *nextHolder; /* Get link first, since we may unlink/delete this proclock */ - nextHolder = (PROCLOCK *) SHMQueueNext(procHolders, &proclock->procLink, + nextHolder = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->procLink, offsetof(PROCLOCK, procLink)); - Assert(proclock->tag.proc == MAKE_OFFSET(proc)); + Assert(proclock->tag.proc == MAKE_OFFSET(MyProc)); lock = (LOCK *) MAKE_PTR(proclock->tag.lock); @@ -1204,8 +1485,12 @@ LockReleaseAll(LOCKMETHODID lockmethodid, PGPROC *proc, if (LOCK_LOCKMETHOD(*lock) != lockmethodid) goto next_item; - /* If not allxids, ignore items that are of the wrong xid */ - if (!allxids && !TransactionIdEquals(xid, proclock->tag.xid)) + /* + * Ignore locks with Xid=0 unless we are asked to release all + * locks + */ + if (TransactionIdEquals(proclock->tag.xid, InvalidTransactionId) + && !allxids) goto next_item; PROCLOCK_PRINT("LockReleaseAll", proclock); @@ -1213,53 +1498,22 @@ LockReleaseAll(LOCKMETHODID lockmethodid, PGPROC *proc, Assert(lock->nRequested >= 0); Assert(lock->nGranted >= 0); Assert(lock->nGranted <= lock->nRequested); - Assert(proclock->nHolding >= 0); - Assert(proclock->nHolding <= lock->nRequested); + Assert((proclock->holdMask & ~lock->grantMask) == 0); /* * fix the general lock stats */ - if (lock->nRequested != proclock->nHolding) - { - for (i = 1; i <= numLockModes; i++) - { - Assert(proclock->holding[i] >= 0); - if (proclock->holding[i] > 0) - { - lock->requested[i] -= proclock->holding[i]; - lock->granted[i] -= proclock->holding[i]; - Assert(lock->requested[i] >= 0 && lock->granted[i] >= 0); - if (lock->granted[i] == 0) - lock->grantMask &= LOCKBIT_OFF(i); - - /* - * Read comments in LockRelease - */ - if (!wakeupNeeded && - lockMethodTable->conflictTab[i] & lock->waitMask) - wakeupNeeded = true; - } - } - lock->nRequested -= proclock->nHolding; - lock->nGranted -= proclock->nHolding; - Assert((lock->nRequested >= 0) && (lock->nGranted >= 0)); - Assert(lock->nGranted <= lock->nRequested); - } - else + if (proclock->holdMask) { - /* - * This proclock accounts for all the requested locks on the - * object, so we can be lazy and just zero things out. - */ - lock->nRequested = 0; - lock->nGranted = 0; - /* Fix the lock status, just for next LOCK_PRINT message. */ for (i = 1; i <= numLockModes; i++) { - Assert(lock->requested[i] == lock->granted[i]); - lock->requested[i] = lock->granted[i] = 0; + if (proclock->holdMask & LOCKBIT_ON(i)) + wakeupNeeded |= UnGrantLock(lock, i, proclock, + lockMethodTable); } } + Assert((lock->nRequested >= 0) && (lock->nGranted >= 0)); + Assert(lock->nGranted <= lock->nRequested); LOCK_PRINT("LockReleaseAll: updated", lock, 0); PROCLOCK_PRINT("LockReleaseAll: deleting", proclock); @@ -1274,7 +1528,7 @@ LockReleaseAll(LOCKMETHODID lockmethodid, PGPROC *proc, * remove the proclock entry from the hashtable */ proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid], - (void *) proclock, + (void *) &(proclock->tag), HASH_REMOVE, NULL); if (!proclock) @@ -1291,14 +1545,14 @@ LockReleaseAll(LOCKMETHODID lockmethodid, PGPROC *proc, * lock object. */ LOCK_PRINT("LockReleaseAll: deleting", lock, 0); - Assert(LockMethodLockHash[lockmethodid]->hash == tag_hash); + Assert(SHMQueueEmpty(&(lock->procLocks))); lock = (LOCK *) hash_search(LockMethodLockHash[lockmethodid], (void *) &(lock->tag), HASH_REMOVE, NULL); if (!lock) { LWLockRelease(masterLock); - elog(WARNING, "cannot remove lock from HTAB"); + elog(WARNING, "lock table corrupted"); return FALSE; } } @@ -1319,15 +1573,141 @@ next_item: return TRUE; } +/* + * LockReleaseCurrentOwner + * Release all locks belonging to CurrentResourceOwner + * + * Only DEFAULT_LOCKMETHOD locks can belong to a resource owner. + */ +void +LockReleaseCurrentOwner(void) +{ + HASH_SEQ_STATUS status; + LOCALLOCK *locallock; + LOCALLOCKOWNER *lockOwners; + int i; + + hash_seq_init(&status, LockMethodLocalHash[DEFAULT_LOCKMETHOD]); + + while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL) + { + /* Ignore items that must be nontransactional */ + if (LOCALLOCK_LOCKMETHOD(*locallock) != DEFAULT_LOCKMETHOD) + continue; + if (TransactionIdEquals(locallock->tag.xid, InvalidTransactionId)) + continue; + + /* Scan to see if there are any locks belonging to current owner */ + lockOwners = locallock->lockOwners; + for (i = locallock->numLockOwners - 1; i >= 0; i--) + { + if (lockOwners[i].owner == CurrentResourceOwner) + { + Assert(lockOwners[i].nLocks > 0); + if (lockOwners[i].nLocks < locallock->nLocks) + { + /* + * We will still hold this lock after forgetting this + * ResourceOwner. + */ + locallock->nLocks -= lockOwners[i].nLocks; + /* compact out unused slot */ + locallock->numLockOwners--; + if (i < locallock->numLockOwners) + lockOwners[i] = lockOwners[locallock->numLockOwners]; + } + else + { + Assert(lockOwners[i].nLocks == locallock->nLocks); + /* We want to call LockRelease just once */ + lockOwners[i].nLocks = 1; + locallock->nLocks = 1; + if (!LockRelease(DEFAULT_LOCKMETHOD, + &locallock->tag.lock, + locallock->tag.xid, + locallock->tag.mode)) + elog(WARNING, "LockReleaseCurrentOwner: failed??"); + } + break; + } + } + } +} + +/* + * LockReassignCurrentOwner + * Reassign all locks belonging to CurrentResourceOwner to belong + * to its parent resource owner + */ +void +LockReassignCurrentOwner(void) +{ + ResourceOwner parent = ResourceOwnerGetParent(CurrentResourceOwner); + HASH_SEQ_STATUS status; + LOCALLOCK *locallock; + LOCALLOCKOWNER *lockOwners; + + Assert(parent != NULL); + + hash_seq_init(&status, LockMethodLocalHash[DEFAULT_LOCKMETHOD]); + + while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL) + { + int i; + int ic = -1; + int ip = -1; + + /* Ignore items that must be nontransactional */ + if (LOCALLOCK_LOCKMETHOD(*locallock) != DEFAULT_LOCKMETHOD) + continue; + if (TransactionIdEquals(locallock->tag.xid, InvalidTransactionId)) + continue; + + /* + * Scan to see if there are any locks belonging to current owner + * or its parent + */ + lockOwners = locallock->lockOwners; + for (i = locallock->numLockOwners - 1; i >= 0; i--) + { + if (lockOwners[i].owner == CurrentResourceOwner) + ic = i; + else if (lockOwners[i].owner == parent) + ip = i; + } + + if (ic < 0) + continue; /* no current locks */ + + if (ip < 0) + { + /* Parent has no slot, so just give it child's slot */ + lockOwners[ic].owner = parent; + } + else + { + /* Merge child's count with parent's */ + lockOwners[ip].nLocks += lockOwners[ic].nLocks; + /* compact out unused slot */ + locallock->numLockOwners--; + if (ic < locallock->numLockOwners) + lockOwners[ic] = lockOwners[locallock->numLockOwners]; + } + } +} + + +/* + * Estimate shared-memory space used for lock tables + */ int LockShmemSize(int maxBackends) { int size = 0; long max_table_size = NLOCKENTS(maxBackends); - size += MAXALIGN(sizeof(PROC_HDR)); /* ProcGlobal */ - size += maxBackends * MAXALIGN(sizeof(PGPROC)); /* each MyProc */ - size += MAX_LOCK_METHODS * MAXALIGN(sizeof(LockMethodData)); /* each lock method */ + /* lock method headers */ + size += MAX_LOCK_METHODS * MAXALIGN(sizeof(LockMethodData)); /* lockHash table */ size += hash_estimate_size(max_table_size, sizeof(LOCK)); @@ -1336,6 +1716,9 @@ LockShmemSize(int maxBackends) size += hash_estimate_size(max_table_size, sizeof(PROCLOCK)); /* + * Note we count only one pair of hash tables, since the userlocks + * table actually overlays the main one. + * * Since the lockHash entry count above is only an estimate, add 10% * safety margin. */ @@ -1375,9 +1758,6 @@ GetLockStatusData(void) data->nelements = i = proclockTable->hctl->nentries; - if (i == 0) - i = 1; /* avoid palloc(0) if empty table */ - data->proclockaddrs = (SHMEM_OFFSET *) palloc(sizeof(SHMEM_OFFSET) * i); data->proclocks = (PROCLOCK *) palloc(sizeof(PROCLOCK) * i); data->procs = (PGPROC *) palloc(sizeof(PGPROC) * i); @@ -1416,7 +1796,7 @@ GetLockmodeName(LOCKMODE mode) #ifdef LOCK_DEBUG /* - * Dump all locks in the proc->procHolders list. + * Dump all locks in the MyProc->procLocks list. * * Must have already acquired the masterLock. */ @@ -1424,7 +1804,7 @@ void DumpLocks(void) { PGPROC *proc; - SHM_QUEUE *procHolders; + SHM_QUEUE *procLocks; PROCLOCK *proclock; LOCK *lock; int lockmethodid = DEFAULT_LOCKMETHOD; @@ -1434,7 +1814,7 @@ DumpLocks(void) if (proc == NULL) return; - procHolders = &proc->procHolders; + procLocks = &proc->procLocks; Assert(lockmethodid < NumLockMethods); lockMethodTable = LockMethods[lockmethodid]; @@ -1444,7 +1824,7 @@ DumpLocks(void) if (proc->waitLock) LOCK_PRINT("DumpLocks: waiting on", proc->waitLock, 0); - proclock = (PROCLOCK *) SHMQueueNext(procHolders, procHolders, + proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks, offsetof(PROCLOCK, procLink)); while (proclock) @@ -1456,7 +1836,7 @@ DumpLocks(void) PROCLOCK_PRINT("DumpLocks", proclock); LOCK_PRINT("DumpLocks", lock, 0); - proclock = (PROCLOCK *) SHMQueueNext(procHolders, &proclock->procLink, + proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->procLink, offsetof(PROCLOCK, procLink)); } }