1 /*-------------------------------------------------------------------------
4 * POSTGRES low-level lock mechanism
6 * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
7 * Portions Copyright (c) 1994, Regents of the University of California
11 * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.92 2001/08/23 23:06:38 tgl Exp $
14 * Outside modules can create a lock table and acquire/release
15 * locks. A lock table is a shared memory hash table. When
16 * a process tries to acquire a lock of a type that conflicts
17 * with existing locks, it is put to sleep using the routines
18 * in storage/lmgr/proc.c.
20 * For the most part, this code should be invoked via lmgr.c
21 * or another lock-management module, not directly.
25 * LockAcquire(), LockRelease(), LockMethodTableInit(),
26 * LockMethodTableRename(), LockReleaseAll,
27 * LockCheckConflicts(), GrantLock()
29 *-------------------------------------------------------------------------
33 #include <sys/types.h>
37 #include "access/xact.h"
38 #include "miscadmin.h"
39 #include "storage/proc.h"
40 #include "utils/memutils.h"
41 #include "utils/ps_status.h"
44 /* This configuration variable is used to set the lock table size */
45 int max_locks_per_xact; /* set by guc.c */
47 #define NLOCKENTS(maxBackends) (max_locks_per_xact * (maxBackends))
50 static int WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode,
51 LOCK *lock, HOLDER *holder);
52 static void LockCountMyLocks(SHMEM_OFFSET lockOffset, PROC *proc,
55 static char *lock_mode_names[] =
61 "ShareUpdateExclusiveLock",
63 "ShareRowExclusiveLock",
68 static char *DeadLockMessage = "Deadlock detected.\n\tSee the lock(l) manual page for a possible cause.";
74 * The following configuration options are available for lock debugging:
76 * TRACE_LOCKS -- give a bunch of output what's going on in this file
77 * TRACE_USERLOCKS -- same but for user locks
78 * TRACE_LOCK_OIDMIN-- do not trace locks for tables below this oid
79 * (use to avoid output on system tables)
80 * TRACE_LOCK_TABLE -- trace locks on this table (oid) unconditionally
81 * DEBUG_DEADLOCKS -- currently dumps locks at untimely occasions ;)
83 * Furthermore, but in storage/ipc/spin.c:
84 * TRACE_SPINLOCKS -- trace spinlocks (pretty useless)
86 * Define LOCK_DEBUG at compile time to get all these enabled.
90 int Trace_lock_oidmin = BootstrapObjectIdData;
91 bool Trace_locks = false;
92 bool Trace_userlocks = false;
93 int Trace_lock_table = 0;
94 bool Debug_deadlocks = false;
98 LOCK_DEBUG_ENABLED(const LOCK *lock)
101 (((LOCK_LOCKMETHOD(*lock) == DEFAULT_LOCKMETHOD && Trace_locks)
102 || (LOCK_LOCKMETHOD(*lock) == USER_LOCKMETHOD && Trace_userlocks))
103 && (lock->tag.relId >= (Oid) Trace_lock_oidmin))
104 || (Trace_lock_table && (lock->tag.relId == Trace_lock_table));
109 LOCK_PRINT(const char *where, const LOCK *lock, LOCKMODE type)
111 if (LOCK_DEBUG_ENABLED(lock))
113 "%s: lock(%lx) tbl(%d) rel(%u) db(%u) obj(%u) grantMask(%x) "
114 "req(%d,%d,%d,%d,%d,%d,%d)=%d "
115 "grant(%d,%d,%d,%d,%d,%d,%d)=%d wait(%d) type(%s)",
116 where, MAKE_OFFSET(lock),
117 lock->tag.lockmethod, lock->tag.relId, lock->tag.dbId,
118 lock->tag.objId.blkno, lock->grantMask,
119 lock->requested[1], lock->requested[2], lock->requested[3],
120 lock->requested[4], lock->requested[5], lock->requested[6],
121 lock->requested[7], lock->nRequested,
122 lock->granted[1], lock->granted[2], lock->granted[3],
123 lock->granted[4], lock->granted[5], lock->granted[6],
124 lock->granted[7], lock->nGranted,
125 lock->waitProcs.size, lock_mode_names[type]);
130 HOLDER_PRINT(const char *where, const HOLDER *holderP)
133 (((HOLDER_LOCKMETHOD(*holderP) == DEFAULT_LOCKMETHOD && Trace_locks)
134 || (HOLDER_LOCKMETHOD(*holderP) == USER_LOCKMETHOD && Trace_userlocks))
135 && (((LOCK *) MAKE_PTR(holderP->tag.lock))->tag.relId >= (Oid) Trace_lock_oidmin))
136 || (Trace_lock_table && (((LOCK *) MAKE_PTR(holderP->tag.lock))->tag.relId == Trace_lock_table))
139 "%s: holder(%lx) lock(%lx) tbl(%d) proc(%lx) xid(%u) hold(%d,%d,%d,%d,%d,%d,%d)=%d",
140 where, MAKE_OFFSET(holderP), holderP->tag.lock,
141 HOLDER_LOCKMETHOD(*(holderP)),
142 holderP->tag.proc, holderP->tag.xid,
143 holderP->holding[1], holderP->holding[2], holderP->holding[3],
144 holderP->holding[4], holderP->holding[5], holderP->holding[6],
145 holderP->holding[7], holderP->nHolding);
148 #else /* not LOCK_DEBUG */
150 #define LOCK_PRINT(where, lock, type)
151 #define HOLDER_PRINT(where, holderP)
153 #endif /* not LOCK_DEBUG */
157 SPINLOCK LockMgrLock; /* in Shmem or created in
158 * CreateSpinlocks() */
161 * These are to simplify/speed up some bit arithmetic.
163 * XXX is a fetch from a static array really faster than a shift?
164 * Wouldn't bet on it...
167 static LOCKMASK BITS_OFF[MAX_LOCKMODES];
168 static LOCKMASK BITS_ON[MAX_LOCKMODES];
173 static bool LockingIsDisabled;
176 * map from lockmethod to the lock table structure
178 static LOCKMETHODTABLE *LockMethodTable[MAX_LOCK_METHODS];
180 static int NumLockMethods;
183 * InitLocks -- Init the lock module. Create a private data
184 * structure for constructing conflict masks.
193 for (i = 0; i < MAX_LOCKMODES; i++, bit <<= 1)
201 * LockDisable -- sets LockingIsDisabled flag to TRUE or FALSE.
204 LockDisable(bool status)
206 LockingIsDisabled = status;
210 * Boolean function to determine current locking status
213 LockingDisabled(void)
215 return LockingIsDisabled;
219 * Fetch the lock method table associated with a given lock
222 GetLocksMethodTable(LOCK *lock)
224 LOCKMETHOD lockmethod = LOCK_LOCKMETHOD(*lock);
226 Assert(lockmethod > 0 && lockmethod < NumLockMethods);
227 return LockMethodTable[lockmethod];
232 * LockMethodInit -- initialize the lock table's lock type
235 * Notes: just copying. Should only be called once.
238 LockMethodInit(LOCKMETHODTABLE *lockMethodTable,
239 LOCKMASK *conflictsP,
245 lockMethodTable->ctl->numLockModes = numModes;
247 for (i = 0; i < numModes; i++, prioP++, conflictsP++)
249 lockMethodTable->ctl->conflictTab[i] = *conflictsP;
250 lockMethodTable->ctl->prio[i] = *prioP;
255 * LockMethodTableInit -- initialize a lock table structure
258 * (a) a lock table has four separate entries in the shmem index
259 * table. This is because every shared hash table and spinlock
260 * has its name stored in the shmem index at its creation. It
261 * is wasteful, in this case, but not much space is involved.
263 * NOTE: data structures allocated here are allocated permanently, using
264 * TopMemoryContext and shared memory. We don't ever release them anyway,
265 * and in normal multi-backend operation the lock table structures set up
266 * by the postmaster are inherited by each backend, so they must be in
270 LockMethodTableInit(char *tabName,
271 LOCKMASK *conflictsP,
276 LOCKMETHODTABLE *lockMethodTable;
281 long init_table_size,
284 if (numModes >= MAX_LOCKMODES)
286 elog(NOTICE, "LockMethodTableInit: too many lock types %d greater than %d",
287 numModes, MAX_LOCKMODES);
288 return INVALID_LOCKMETHOD;
291 /* Compute init/max size to request for lock hashtables */
292 max_table_size = NLOCKENTS(maxBackends);
293 init_table_size = max_table_size / 10;
295 /* Allocate a string for the shmem index table lookups. */
296 /* This is just temp space in this routine, so palloc is OK. */
297 shmemName = (char *) palloc(strlen(tabName) + 32);
299 /* each lock table has a non-shared, permanent header */
300 lockMethodTable = (LOCKMETHODTABLE *)
301 MemoryContextAlloc(TopMemoryContext, sizeof(LOCKMETHODTABLE));
304 * find/acquire the spinlock for the table
306 SpinAcquire(LockMgrLock);
309 * allocate a control structure from shared memory or attach to it if
312 sprintf(shmemName, "%s (ctl)", tabName);
313 lockMethodTable->ctl = (LOCKMETHODCTL *)
314 ShmemInitStruct(shmemName, sizeof(LOCKMETHODCTL), &found);
316 if (!lockMethodTable->ctl)
317 elog(FATAL, "LockMethodTableInit: couldn't initialize %s", tabName);
325 * we're first - initialize
329 MemSet(lockMethodTable->ctl, 0, sizeof(LOCKMETHODCTL));
330 lockMethodTable->ctl->masterLock = LockMgrLock;
331 lockMethodTable->ctl->lockmethod = NumLockMethods;
335 * other modules refer to the lock table by a lockmethod ID
337 LockMethodTable[NumLockMethods] = lockMethodTable;
339 Assert(NumLockMethods <= MAX_LOCK_METHODS);
342 * allocate a hash table for LOCK structs. This is used to store
343 * per-locked-object information.
345 info.keysize = SHMEM_LOCKTAB_KEYSIZE;
346 info.datasize = SHMEM_LOCKTAB_DATASIZE;
347 info.hash = tag_hash;
348 hash_flags = (HASH_ELEM | HASH_FUNCTION);
350 sprintf(shmemName, "%s (lock hash)", tabName);
351 lockMethodTable->lockHash = ShmemInitHash(shmemName,
357 if (!lockMethodTable->lockHash)
358 elog(FATAL, "LockMethodTableInit: couldn't initialize %s", tabName);
359 Assert(lockMethodTable->lockHash->hash == tag_hash);
362 * allocate a hash table for HOLDER structs. This is used to store
363 * per-lock-holder information.
365 info.keysize = SHMEM_HOLDERTAB_KEYSIZE;
366 info.datasize = SHMEM_HOLDERTAB_DATASIZE;
367 info.hash = tag_hash;
368 hash_flags = (HASH_ELEM | HASH_FUNCTION);
370 sprintf(shmemName, "%s (holder hash)", tabName);
371 lockMethodTable->holderHash = ShmemInitHash(shmemName,
377 if (!lockMethodTable->holderHash)
378 elog(FATAL, "LockMethodTableInit: couldn't initialize %s", tabName);
380 /* init ctl data structures */
381 LockMethodInit(lockMethodTable, conflictsP, prioP, numModes);
383 SpinRelease(LockMgrLock);
387 return lockMethodTable->ctl->lockmethod;
391 * LockMethodTableRename -- allocate another lockmethod ID to the same
394 * NOTES: Both the lock module and the lock chain (lchain.c)
395 * module use table id's to distinguish between different
396 * kinds of locks. Short term and long term locks look
397 * the same to the lock table, but are handled differently
398 * by the lock chain manager. This function allows the
399 * client to use different lockmethods when acquiring/releasing
400 * short term and long term locks, yet store them all in one hashtable.
404 LockMethodTableRename(LOCKMETHOD lockmethod)
406 LOCKMETHOD newLockMethod;
408 if (NumLockMethods >= MAX_LOCK_METHODS)
409 return INVALID_LOCKMETHOD;
410 if (LockMethodTable[lockmethod] == INVALID_LOCKMETHOD)
411 return INVALID_LOCKMETHOD;
413 /* other modules refer to the lock table by a lockmethod ID */
414 newLockMethod = NumLockMethods;
417 LockMethodTable[newLockMethod] = LockMethodTable[lockmethod];
418 return newLockMethod;
422 * LockAcquire -- Check for lock conflicts, sleep if conflict found,
423 * set lock if/when no conflicts.
425 * Returns: TRUE if lock was acquired, FALSE otherwise. Note that
426 * a FALSE return is to be expected if dontWait is TRUE;
427 * but if dontWait is FALSE, only a parameter error can cause
428 * a FALSE return. (XXX probably we should just elog on parameter
429 * errors, instead of conflating this with failure to acquire lock?)
431 * Side Effects: The lock is acquired and recorded in lock tables.
433 * NOTE: if we wait for the lock, there is no way to abort the wait
434 * short of aborting the transaction.
437 * Note on User Locks:
439 * User locks are handled totally on the application side as
440 * long term cooperative locks which extend beyond the normal
441 * transaction boundaries. Their purpose is to indicate to an
442 * application that someone is `working' on an item. So it is
443 * possible to put an user lock on a tuple's oid, retrieve the
444 * tuple, work on it for an hour and then update it and remove
445 * the lock. While the lock is active other clients can still
446 * read and write the tuple but they can be aware that it has
447 * been locked at the application level by someone.
448 * User locks use lock tags made of an uint16 and an uint32, for
449 * example 0 and a tuple oid, or any other arbitrary pair of
450 * numbers following a convention established by the application.
451 * In this sense tags don't refer to tuples or database entities.
452 * User locks and normal locks are completely orthogonal and
453 * they don't interfere with each other, so it is possible
454 * to acquire a normal lock on an user-locked tuple or user-lock
455 * a tuple for which a normal write lock already exists.
456 * User locks are always non blocking, therefore they are never
457 * acquired if already held by another process. They must be
458 * released explicitly by the application but they are released
459 * automatically when a backend terminates.
460 * They are indicated by a lockmethod 2 which is an alias for the
461 * normal lock table, and are distinguished from normal locks
462 * by the following differences:
464 * normal lock user lock
467 * tag.dbId database oid database oid
468 * tag.relId rel oid or 0 0
469 * tag.objId block id lock id2
471 * tag.offnum 0 lock id1
472 * holder.xid xid or 0 0
473 * persistence transaction user or backend
476 * The lockmode parameter can have the same values for normal locks
477 * although probably only WRITE_LOCK can have some practical use.
483 LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
484 TransactionId xid, LOCKMODE lockmode, bool dontWait)
492 LOCKMETHODTABLE *lockMethodTable;
494 int myHolding[MAX_LOCKMODES];
498 if (lockmethod == USER_LOCKMETHOD && Trace_userlocks)
499 elog(DEBUG, "LockAcquire: user lock [%u] %s",
500 locktag->objId.blkno, lock_mode_names[lockmode]);
503 /* ???????? This must be changed when short term locks will be used */
504 locktag->lockmethod = lockmethod;
506 Assert(lockmethod < NumLockMethods);
507 lockMethodTable = LockMethodTable[lockmethod];
508 if (!lockMethodTable)
510 elog(NOTICE, "LockAcquire: bad lock table %d", lockmethod);
514 if (LockingIsDisabled)
517 masterLock = lockMethodTable->ctl->masterLock;
519 SpinAcquire(masterLock);
522 * Find or create a lock with this tag
524 Assert(lockMethodTable->lockHash->hash == tag_hash);
525 lock = (LOCK *) hash_search(lockMethodTable->lockHash, (Pointer) locktag,
529 SpinRelease(masterLock);
530 elog(FATAL, "LockAcquire: lock table %d is corrupted", lockmethod);
535 * if it's a new lock object, initialize it
541 SHMQueueInit(&(lock->lockHolders));
542 ProcQueueInit(&(lock->waitProcs));
543 lock->nRequested = 0;
545 MemSet((char *) lock->requested, 0, sizeof(int) * MAX_LOCKMODES);
546 MemSet((char *) lock->granted, 0, sizeof(int) * MAX_LOCKMODES);
547 LOCK_PRINT("LockAcquire: new", lock, lockmode);
551 LOCK_PRINT("LockAcquire: found", lock, lockmode);
552 Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
553 Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
554 Assert(lock->nGranted <= lock->nRequested);
558 * Create the hash key for the holder table.
560 MemSet(&holdertag, 0, sizeof(HOLDERTAG)); /* must clear padding,
562 holdertag.lock = MAKE_OFFSET(lock);
563 holdertag.proc = MAKE_OFFSET(MyProc);
564 TransactionIdStore(xid, &holdertag.xid);
567 * Find or create a holder entry with this tag
569 holderTable = lockMethodTable->holderHash;
570 holder = (HOLDER *) hash_search(holderTable, (Pointer) &holdertag,
574 SpinRelease(masterLock);
575 elog(FATAL, "LockAcquire: holder table corrupted");
580 * If new, initialize the new entry
584 holder->nHolding = 0;
585 MemSet((char *) holder->holding, 0, sizeof(int) * MAX_LOCKMODES);
586 /* Add holder to appropriate lists */
587 SHMQueueInsertBefore(&lock->lockHolders, &holder->lockLink);
588 SHMQueueInsertBefore(&MyProc->procHolders, &holder->procLink);
589 HOLDER_PRINT("LockAcquire: new", holder);
593 HOLDER_PRINT("LockAcquire: found", holder);
594 Assert((holder->nHolding >= 0) && (holder->holding[lockmode] >= 0));
595 Assert(holder->nHolding <= lock->nGranted);
597 #ifdef CHECK_DEADLOCK_RISK
600 * Issue warning if we already hold a lower-level lock on this
601 * object and do not hold a lock of the requested level or higher.
602 * This indicates a deadlock-prone coding practice (eg, we'd have
603 * a deadlock if another backend were following the same code path
604 * at about the same time).
606 * This is not enabled by default, because it may generate log
607 * entries about user-level coding practices that are in fact safe
608 * in context. It can be enabled to help find system-level
611 * XXX Doing numeric comparison on the lockmodes is a hack; it'd be
612 * better to use a table. For now, though, this works.
614 for (i = lockMethodTable->ctl->numLockModes; i > 0; i--)
616 if (holder->holding[i] > 0)
618 if (i >= (int) lockmode)
619 break; /* safe: we have a lock >= req level */
620 elog(DEBUG, "Deadlock risk: raising lock level"
621 " from %s to %s on object %u/%u/%u",
622 lock_mode_names[i], lock_mode_names[lockmode],
623 lock->tag.relId, lock->tag.dbId, lock->tag.objId.blkno);
627 #endif /* CHECK_DEADLOCK_RISK */
631 * lock->nRequested and lock->requested[] count the total number of
632 * requests, whether granted or waiting, so increment those
633 * immediately. The other counts don't increment till we get the lock.
636 lock->requested[lockmode]++;
637 Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
640 * If I already hold one or more locks of the requested type, just
641 * grant myself another one without blocking.
643 if (holder->holding[lockmode] > 0)
645 GrantLock(lock, holder, lockmode);
646 HOLDER_PRINT("LockAcquire: owning", holder);
647 SpinRelease(masterLock);
652 * If this process (under any XID) is a holder of the lock, also grant
653 * myself another one without blocking.
655 LockCountMyLocks(holder->tag.lock, MyProc, myHolding);
656 if (myHolding[lockmode] > 0)
658 GrantLock(lock, holder, lockmode);
659 HOLDER_PRINT("LockAcquire: my other XID owning", holder);
660 SpinRelease(masterLock);
665 * If lock requested conflicts with locks requested by waiters, must
666 * join wait queue. Otherwise, check for conflict with already-held
667 * locks. (That's last because most complex check.)
669 if (lockMethodTable->ctl->conflictTab[lockmode] & lock->waitMask)
670 status = STATUS_FOUND;
672 status = LockCheckConflicts(lockMethodTable, lockmode,
676 if (status == STATUS_OK)
678 /* No conflict with held or previously requested locks */
679 GrantLock(lock, holder, lockmode);
683 Assert(status == STATUS_FOUND);
685 * We can't acquire the lock immediately. If caller specified no
686 * blocking, remove the holder entry and return FALSE without waiting.
690 if (holder->nHolding == 0)
692 SHMQueueDelete(&holder->lockLink);
693 SHMQueueDelete(&holder->procLink);
694 holder = (HOLDER *) hash_search(holderTable,
696 HASH_REMOVE, &found);
697 if (!holder || !found)
698 elog(NOTICE, "LockAcquire: remove holder, table corrupted");
701 HOLDER_PRINT("LockAcquire: NHOLDING", holder);
703 lock->requested[lockmode]--;
704 LOCK_PRINT("LockAcquire: conditional lock failed", lock, lockmode);
705 Assert((lock->nRequested > 0) && (lock->requested[lockmode] >= 0));
706 Assert(lock->nGranted <= lock->nRequested);
707 SpinRelease(masterLock);
712 * Construct bitmask of locks this process holds on this object.
718 for (i = 1, tmpMask = 2;
719 i <= lockMethodTable->ctl->numLockModes;
722 if (myHolding[i] > 0)
723 heldLocks |= tmpMask;
725 MyProc->heldLocks = heldLocks;
729 * Sleep till someone wakes me up.
731 status = WaitOnLock(lockmethod, lockmode, lock, holder);
734 * NOTE: do not do any material change of state between here and
735 * return. All required changes in locktable state must have been
736 * done when the lock was granted to us --- see notes in
741 * Check the holder entry status, in case something in the ipc
742 * communication doesn't work correctly.
744 if (!((holder->nHolding > 0) && (holder->holding[lockmode] > 0)))
746 HOLDER_PRINT("LockAcquire: INCONSISTENT", holder);
747 LOCK_PRINT("LockAcquire: INCONSISTENT", lock, lockmode);
748 /* Should we retry ? */
749 SpinRelease(masterLock);
752 HOLDER_PRINT("LockAcquire: granted", holder);
753 LOCK_PRINT("LockAcquire: granted", lock, lockmode);
756 SpinRelease(masterLock);
758 return status == STATUS_OK;
762 * LockCheckConflicts -- test whether requested lock conflicts
763 * with those already granted
765 * Returns STATUS_FOUND if conflict, STATUS_OK if no conflict.
768 * Here's what makes this complicated: one process's locks don't
769 * conflict with one another, even if they are held under different
770 * transaction IDs (eg, session and xact locks do not conflict).
771 * So, we must subtract off our own locks when determining whether the
772 * requested new lock conflicts with those already held.
774 * The caller can optionally pass the process's total holding counts, if
775 * known. If NULL is passed then these values will be computed internally.
778 LockCheckConflicts(LOCKMETHODTABLE *lockMethodTable,
783 int *myHolding) /* myHolding[] array or NULL */
785 LOCKMETHODCTL *lockctl = lockMethodTable->ctl;
786 int numLockModes = lockctl->numLockModes;
790 int localHolding[MAX_LOCKMODES];
793 * first check for global conflicts: If no locks conflict with my
794 * request, then I get the lock.
796 * Checking for conflict: lock->grantMask represents the types of
797 * currently held locks. conflictTable[lockmode] has a bit set for
798 * each type of lock that conflicts with request. Bitwise compare
799 * tells if there is a conflict.
801 if (!(lockctl->conflictTab[lockmode] & lock->grantMask))
803 HOLDER_PRINT("LockCheckConflicts: no conflict", holder);
808 * Rats. Something conflicts. But it could still be my own lock. We
809 * have to construct a conflict mask that does not reflect our own
810 * locks. Locks held by the current process under another XID also
811 * count as "our own locks".
813 if (myHolding == NULL)
815 /* Caller didn't do calculation of total holding for me */
816 LockCountMyLocks(holder->tag.lock, proc, localHolding);
817 myHolding = localHolding;
820 /* Compute mask of lock types held by other processes */
823 for (i = 1; i <= numLockModes; i++, tmpMask <<= 1)
825 if (lock->granted[i] != myHolding[i])
830 * now check again for conflicts. 'bitmask' describes the types of
831 * locks held by other processes. If one of these conflicts with the
832 * kind of lock that I want, there is a conflict and I have to sleep.
834 if (!(lockctl->conflictTab[lockmode] & bitmask))
836 /* no conflict. OK to get the lock */
837 HOLDER_PRINT("LockCheckConflicts: resolved", holder);
841 HOLDER_PRINT("LockCheckConflicts: conflicting", holder);
846 * LockCountMyLocks --- Count total number of locks held on a given lockable
847 * object by a given process (under any transaction ID).
849 * XXX This could be rather slow if the process holds a large number of locks.
850 * Perhaps it could be sped up if we kept yet a third hashtable of per-
851 * process lock information. However, for the normal case where a transaction
852 * doesn't hold a large number of locks, keeping such a table would probably
856 LockCountMyLocks(SHMEM_OFFSET lockOffset, PROC *proc, int *myHolding)
858 SHM_QUEUE *procHolders = &(proc->procHolders);
862 MemSet(myHolding, 0, MAX_LOCKMODES * sizeof(int));
864 holder = (HOLDER *) SHMQueueNext(procHolders, procHolders,
865 offsetof(HOLDER, procLink));
869 if (lockOffset == holder->tag.lock)
871 for (i = 1; i < MAX_LOCKMODES; i++)
872 myHolding[i] += holder->holding[i];
875 holder = (HOLDER *) SHMQueueNext(procHolders, &holder->procLink,
876 offsetof(HOLDER, procLink));
881 * GrantLock -- update the lock and holder data structures to show
882 * the lock request has been granted.
884 * NOTE: if proc was blocked, it also needs to be removed from the wait list
885 * and have its waitLock/waitHolder fields cleared. That's not done here.
888 GrantLock(LOCK *lock, HOLDER *holder, LOCKMODE lockmode)
891 lock->granted[lockmode]++;
892 lock->grantMask |= BITS_ON[lockmode];
893 if (lock->granted[lockmode] == lock->requested[lockmode])
894 lock->waitMask &= BITS_OFF[lockmode];
895 LOCK_PRINT("GrantLock", lock, lockmode);
896 Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
897 Assert(lock->nGranted <= lock->nRequested);
898 holder->holding[lockmode]++;
900 Assert((holder->nHolding > 0) && (holder->holding[lockmode] > 0));
904 * WaitOnLock -- wait to acquire a lock
906 * Caller must have set MyProc->heldLocks to reflect locks already held
907 * on the lockable object by this process (under all XIDs).
909 * The locktable spinlock must be held at entry.
912 WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode,
913 LOCK *lock, HOLDER *holder)
915 LOCKMETHODTABLE *lockMethodTable = LockMethodTable[lockmethod];
919 Assert(lockmethod < NumLockMethods);
921 LOCK_PRINT("WaitOnLock: sleeping on lock", lock, lockmode);
923 old_status = pstrdup(get_ps_display());
924 new_status = (char *) palloc(strlen(old_status) + 10);
925 strcpy(new_status, old_status);
926 strcat(new_status, " waiting");
927 set_ps_display(new_status);
930 * NOTE: Think not to put any shared-state cleanup after the call to
931 * ProcSleep, in either the normal or failure path. The lock state
932 * must be fully set by the lock grantor, or by HandleDeadLock if we
933 * give up waiting for the lock. This is necessary because of the
934 * possibility that a cancel/die interrupt will interrupt ProcSleep
935 * after someone else grants us the lock, but before we've noticed it.
936 * Hence, after granting, the locktable state must fully reflect the
937 * fact that we own the lock; we can't do additional work on return.
938 * Contrariwise, if we fail, any cleanup must happen in xact abort
939 * processing, not here, to ensure it will also happen in the
943 if (ProcSleep(lockMethodTable,
946 holder) != STATUS_OK)
950 * We failed as a result of a deadlock, see HandleDeadLock(). Quit
951 * now. Removal of the holder and lock objects, if no longer
952 * needed, will happen in xact cleanup (see above for motivation).
954 LOCK_PRINT("WaitOnLock: aborting on lock", lock, lockmode);
955 SpinRelease(lockMethodTable->ctl->masterLock);
956 elog(ERROR, DeadLockMessage);
960 set_ps_display(old_status);
964 LOCK_PRINT("WaitOnLock: wakeup on lock", lock, lockmode);
969 * Remove a proc from the wait-queue it is on
970 * (caller must know it is on one).
972 * Locktable lock must be held by caller.
974 * NB: this does not remove the process' holder object, nor the lock object,
975 * even though their counts might now have gone to zero. That will happen
976 * during a subsequent LockReleaseAll call, which we expect will happen
977 * during transaction cleanup. (Removal of a proc from its wait queue by
978 * this routine can only happen if we are aborting the transaction.)
981 RemoveFromWaitQueue(PROC *proc)
983 LOCK *waitLock = proc->waitLock;
984 LOCKMODE lockmode = proc->waitLockMode;
986 /* Make sure proc is waiting */
987 Assert(proc->links.next != INVALID_OFFSET);
989 Assert(waitLock->waitProcs.size > 0);
991 /* Remove proc from lock's wait queue */
992 SHMQueueDelete(&(proc->links));
993 waitLock->waitProcs.size--;
995 /* Undo increments of request counts by waiting process */
996 Assert(waitLock->nRequested > 0);
997 Assert(waitLock->nRequested > proc->waitLock->nGranted);
998 waitLock->nRequested--;
999 Assert(waitLock->requested[lockmode] > 0);
1000 waitLock->requested[lockmode]--;
1001 /* don't forget to clear waitMask bit if appropriate */
1002 if (waitLock->granted[lockmode] == waitLock->requested[lockmode])
1003 waitLock->waitMask &= BITS_OFF[lockmode];
1005 /* Clean up the proc's own state */
1006 proc->waitLock = NULL;
1007 proc->waitHolder = NULL;
1009 /* See if any other waiters for the lock can be woken up now */
1010 ProcLockWakeup(GetLocksMethodTable(waitLock), waitLock);
1014 * LockRelease -- look up 'locktag' in lock table 'lockmethod' and
1015 * release one 'lockmode' lock on it.
1017 * Side Effects: find any waiting processes that are now wakable,
1018 * grant them their requested locks and awaken them.
1019 * (We have to grant the lock here to avoid a race between
1020 * the waking process and any new process to
1021 * come along and request the lock.)
1024 LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag,
1025 TransactionId xid, LOCKMODE lockmode)
1028 SPINLOCK masterLock;
1030 LOCKMETHODTABLE *lockMethodTable;
1032 HOLDERTAG holdertag;
1034 bool wakeupNeeded = false;
1037 if (lockmethod == USER_LOCKMETHOD && Trace_userlocks)
1038 elog(DEBUG, "LockRelease: user lock tag [%u] %d", locktag->objId.blkno, lockmode);
1041 /* ???????? This must be changed when short term locks will be used */
1042 locktag->lockmethod = lockmethod;
1044 Assert(lockmethod < NumLockMethods);
1045 lockMethodTable = LockMethodTable[lockmethod];
1046 if (!lockMethodTable)
1048 elog(NOTICE, "lockMethodTable is null in LockRelease");
1052 if (LockingIsDisabled)
1055 masterLock = lockMethodTable->ctl->masterLock;
1056 SpinAcquire(masterLock);
1059 * Find a lock with this tag
1061 Assert(lockMethodTable->lockHash->hash == tag_hash);
1062 lock = (LOCK *) hash_search(lockMethodTable->lockHash, (Pointer) locktag,
1066 * let the caller print its own error message, too. Do not
1071 SpinRelease(masterLock);
1072 elog(NOTICE, "LockRelease: locktable corrupted");
1078 SpinRelease(masterLock);
1079 elog(NOTICE, "LockRelease: no such lock");
1082 LOCK_PRINT("LockRelease: found", lock, lockmode);
1085 * Find the holder entry for this holder.
1087 MemSet(&holdertag, 0, sizeof(HOLDERTAG)); /* must clear padding,
1089 holdertag.lock = MAKE_OFFSET(lock);
1090 holdertag.proc = MAKE_OFFSET(MyProc);
1091 TransactionIdStore(xid, &holdertag.xid);
1093 holderTable = lockMethodTable->holderHash;
1094 holder = (HOLDER *) hash_search(holderTable, (Pointer) &holdertag,
1095 HASH_FIND_SAVE, &found);
1096 if (!holder || !found)
1098 SpinRelease(masterLock);
1100 if (!found && lockmethod == USER_LOCKMETHOD)
1101 elog(NOTICE, "LockRelease: no lock with this tag");
1104 elog(NOTICE, "LockRelease: holder table corrupted");
1107 HOLDER_PRINT("LockRelease: found", holder);
1110 * Check that we are actually holding a lock of the type we want to
1113 if (!(holder->holding[lockmode] > 0))
1115 HOLDER_PRINT("LockRelease: WRONGTYPE", holder);
1116 Assert(holder->holding[lockmode] >= 0);
1117 SpinRelease(masterLock);
1118 elog(NOTICE, "LockRelease: you don't own a lock of type %s",
1119 lock_mode_names[lockmode]);
1122 Assert(holder->nHolding > 0);
1123 Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
1124 Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
1125 Assert(lock->nGranted <= lock->nRequested);
1128 * fix the general lock stats
1131 lock->requested[lockmode]--;
1133 lock->granted[lockmode]--;
1135 if (lock->granted[lockmode] == 0)
1137 /* change the conflict mask. No more of this lock type. */
1138 lock->grantMask &= BITS_OFF[lockmode];
1141 LOCK_PRINT("LockRelease: updated", lock, lockmode);
1142 Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
1143 Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
1144 Assert(lock->nGranted <= lock->nRequested);
1147 * We need only run ProcLockWakeup if the released lock conflicts with
1148 * at least one of the lock types requested by waiter(s). Otherwise
1149 * whatever conflict made them wait must still exist. NOTE: before
1150 * MVCC, we could skip wakeup if lock->granted[lockmode] was still
1151 * positive. But that's not true anymore, because the remaining
1152 * granted locks might belong to some waiter, who could now be
1153 * awakened because he doesn't conflict with his own locks.
1155 if (lockMethodTable->ctl->conflictTab[lockmode] & lock->waitMask)
1156 wakeupNeeded = true;
1158 if (lock->nRequested == 0)
1162 * if there's no one waiting in the queue, we just released the
1163 * last lock on this object. Delete it from the lock table.
1165 Assert(lockMethodTable->lockHash->hash == tag_hash);
1166 lock = (LOCK *) hash_search(lockMethodTable->lockHash,
1167 (Pointer) &(lock->tag),
1170 if (!lock || !found)
1172 SpinRelease(masterLock);
1173 elog(NOTICE, "LockRelease: remove lock, table corrupted");
1176 wakeupNeeded = false; /* should be false, but make sure */
1180 * Now fix the per-holder lock stats.
1182 holder->holding[lockmode]--;
1184 HOLDER_PRINT("LockRelease: updated", holder);
1185 Assert((holder->nHolding >= 0) && (holder->holding[lockmode] >= 0));
1188 * If this was my last hold on this lock, delete my entry in the
1191 if (holder->nHolding == 0)
1193 HOLDER_PRINT("LockRelease: deleting", holder);
1194 SHMQueueDelete(&holder->lockLink);
1195 SHMQueueDelete(&holder->procLink);
1196 holder = (HOLDER *) hash_search(holderTable, (Pointer) &holder,
1197 HASH_REMOVE_SAVED, &found);
1198 if (!holder || !found)
1200 SpinRelease(masterLock);
1201 elog(NOTICE, "LockRelease: remove holder, table corrupted");
1207 * Wake up waiters if needed.
1210 ProcLockWakeup(lockMethodTable, lock);
1212 SpinRelease(masterLock);
1217 * LockReleaseAll -- Release all locks in a process's lock list.
1219 * Well, not really *all* locks.
1221 * If 'allxids' is TRUE, all locks of the specified lock method are
1222 * released, regardless of transaction affiliation.
1224 * If 'allxids' is FALSE, all locks of the specified lock method and
1225 * specified XID are released.
1228 LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc,
1229 bool allxids, TransactionId xid)
1231 SHM_QUEUE *procHolders = &(proc->procHolders);
1234 SPINLOCK masterLock;
1235 LOCKMETHODTABLE *lockMethodTable;
1242 if (lockmethod == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
1243 elog(DEBUG, "LockReleaseAll: lockmethod=%d, pid=%d",
1244 lockmethod, proc->pid);
1247 Assert(lockmethod < NumLockMethods);
1248 lockMethodTable = LockMethodTable[lockmethod];
1249 if (!lockMethodTable)
1251 elog(NOTICE, "LockReleaseAll: bad lockmethod %d", lockmethod);
1255 numLockModes = lockMethodTable->ctl->numLockModes;
1256 masterLock = lockMethodTable->ctl->masterLock;
1258 SpinAcquire(masterLock);
1260 holder = (HOLDER *) SHMQueueNext(procHolders, procHolders,
1261 offsetof(HOLDER, procLink));
1265 bool wakeupNeeded = false;
1267 /* Get link first, since we may unlink/delete this holder */
1268 nextHolder = (HOLDER *) SHMQueueNext(procHolders, &holder->procLink,
1269 offsetof(HOLDER, procLink));
1271 Assert(holder->tag.proc == MAKE_OFFSET(proc));
1273 lock = (LOCK *) MAKE_PTR(holder->tag.lock);
1275 /* Ignore items that are not of the lockmethod to be removed */
1276 if (LOCK_LOCKMETHOD(*lock) != lockmethod)
1279 /* If not allxids, ignore items that are of the wrong xid */
1280 if (!allxids && !TransactionIdEquals(xid, holder->tag.xid))
1283 HOLDER_PRINT("LockReleaseAll", holder);
1284 LOCK_PRINT("LockReleaseAll", lock, 0);
1285 Assert(lock->nRequested >= 0);
1286 Assert(lock->nGranted >= 0);
1287 Assert(lock->nGranted <= lock->nRequested);
1288 Assert(holder->nHolding >= 0);
1289 Assert(holder->nHolding <= lock->nRequested);
1292 * fix the general lock stats
1294 if (lock->nRequested != holder->nHolding)
1296 for (i = 1; i <= numLockModes; i++)
1298 Assert(holder->holding[i] >= 0);
1299 if (holder->holding[i] > 0)
1301 lock->requested[i] -= holder->holding[i];
1302 lock->granted[i] -= holder->holding[i];
1303 Assert(lock->requested[i] >= 0 && lock->granted[i] >= 0);
1304 if (lock->granted[i] == 0)
1305 lock->grantMask &= BITS_OFF[i];
1308 * Read comments in LockRelease
1310 if (!wakeupNeeded &&
1311 lockMethodTable->ctl->conflictTab[i] & lock->waitMask)
1312 wakeupNeeded = true;
1315 lock->nRequested -= holder->nHolding;
1316 lock->nGranted -= holder->nHolding;
1317 Assert((lock->nRequested >= 0) && (lock->nGranted >= 0));
1318 Assert(lock->nGranted <= lock->nRequested);
1323 * This holder accounts for all the requested locks on the
1324 * object, so we can be lazy and just zero things out.
1326 lock->nRequested = 0;
1328 /* Fix the lock status, just for next LOCK_PRINT message. */
1329 for (i = 1; i <= numLockModes; i++)
1331 Assert(lock->requested[i] == lock->granted[i]);
1332 lock->requested[i] = lock->granted[i] = 0;
1335 LOCK_PRINT("LockReleaseAll: updated", lock, 0);
1337 HOLDER_PRINT("LockReleaseAll: deleting", holder);
1340 * Remove the holder entry from the linked lists
1342 SHMQueueDelete(&holder->lockLink);
1343 SHMQueueDelete(&holder->procLink);
1346 * remove the holder entry from the hashtable
1348 holder = (HOLDER *) hash_search(lockMethodTable->holderHash,
1352 if (!holder || !found)
1354 SpinRelease(masterLock);
1355 elog(NOTICE, "LockReleaseAll: holder table corrupted");
1359 if (lock->nRequested == 0)
1363 * We've just released the last lock, so garbage-collect the
1366 LOCK_PRINT("LockReleaseAll: deleting", lock, 0);
1367 Assert(lockMethodTable->lockHash->hash == tag_hash);
1368 lock = (LOCK *) hash_search(lockMethodTable->lockHash,
1369 (Pointer) &(lock->tag),
1370 HASH_REMOVE, &found);
1371 if (!lock || !found)
1373 SpinRelease(masterLock);
1374 elog(NOTICE, "LockReleaseAll: cannot remove lock from HTAB");
1378 else if (wakeupNeeded)
1379 ProcLockWakeup(lockMethodTable, lock);
1382 holder = nextHolder;
1385 SpinRelease(masterLock);
1388 if (lockmethod == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
1389 elog(DEBUG, "LockReleaseAll: done");
1396 LockShmemSize(int maxBackends)
1399 long max_table_size = NLOCKENTS(maxBackends);
1401 size += MAXALIGN(sizeof(PROC_HDR)); /* ProcGlobal */
1402 size += maxBackends * MAXALIGN(sizeof(PROC)); /* each MyProc */
1403 size += MAX_LOCK_METHODS * MAXALIGN(sizeof(LOCKMETHODCTL)); /* each
1404 * lockMethodTable->ctl */
1406 /* lockHash table */
1407 size += hash_estimate_size(max_table_size,
1408 SHMEM_LOCKTAB_KEYSIZE,
1409 SHMEM_LOCKTAB_DATASIZE);
1411 /* holderHash table */
1412 size += hash_estimate_size(max_table_size,
1413 SHMEM_HOLDERTAB_KEYSIZE,
1414 SHMEM_HOLDERTAB_DATASIZE);
1417 * Since the lockHash entry count above is only an estimate, add 10%
1428 * Dump all locks in the proc->procHolders list.
1430 * Must have already acquired the masterLock.
1435 SHMEM_OFFSET location;
1437 SHM_QUEUE *procHolders;
1440 int lockmethod = DEFAULT_LOCKMETHOD;
1441 LOCKMETHODTABLE *lockMethodTable;
1443 ShmemPIDLookup(MyProcPid, &location);
1444 if (location == INVALID_OFFSET)
1446 proc = (PROC *) MAKE_PTR(location);
1449 procHolders = &proc->procHolders;
1451 Assert(lockmethod < NumLockMethods);
1452 lockMethodTable = LockMethodTable[lockmethod];
1453 if (!lockMethodTable)
1457 LOCK_PRINT("DumpLocks: waiting on", proc->waitLock, 0);
1459 holder = (HOLDER *) SHMQueueNext(procHolders, procHolders,
1460 offsetof(HOLDER, procLink));
1464 Assert(holder->tag.proc == MAKE_OFFSET(proc));
1466 lock = (LOCK *) MAKE_PTR(holder->tag.lock);
1468 HOLDER_PRINT("DumpLocks", holder);
1469 LOCK_PRINT("DumpLocks", lock, 0);
1471 holder = (HOLDER *) SHMQueueNext(procHolders, &holder->procLink,
1472 offsetof(HOLDER, procLink));
1477 * Dump all postgres locks. Must have already acquired the masterLock.
1482 SHMEM_OFFSET location;
1484 HOLDER *holder = NULL;
1487 int lockmethod = DEFAULT_LOCKMETHOD;
1488 LOCKMETHODTABLE *lockMethodTable;
1490 HASH_SEQ_STATUS status;
1493 ShmemPIDLookup(pid, &location);
1494 if (location == INVALID_OFFSET)
1496 proc = (PROC *) MAKE_PTR(location);
1500 Assert(lockmethod < NumLockMethods);
1501 lockMethodTable = LockMethodTable[lockmethod];
1502 if (!lockMethodTable)
1505 holderTable = lockMethodTable->holderHash;
1508 LOCK_PRINT("DumpAllLocks: waiting on", proc->waitLock, 0);
1510 hash_seq_init(&status, holderTable);
1511 while ((holder = (HOLDER *) hash_seq_search(&status)) &&
1512 (holder != (HOLDER *) TRUE))
1514 HOLDER_PRINT("DumpAllLocks", holder);
1516 if (holder->tag.lock)
1518 lock = (LOCK *) MAKE_PTR(holder->tag.lock);
1519 LOCK_PRINT("DumpAllLocks", lock, 0);
1522 elog(DEBUG, "DumpAllLocks: holder->tag.lock = NULL");
1526 #endif /* LOCK_DEBUG */