1 /*-------------------------------------------------------------------------
4 * simple lock acquisition
6 * Copyright (c) 1994, Regents of the University of California
10 * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.32 1998/06/30 02:33:31 momjian Exp $
13 * Outside modules can create a lock table and acquire/release
14 * locks. A lock table is a shared memory hash table. When
15 * a process tries to acquire a lock of a type that conflictRs
16 * with existing locks, it is put to sleep using the routines
17 * in storage/lmgr/proc.c.
21 * LockAcquire(), LockRelease(), LockMethodTableInit().
23 * LockReplace() is called only within this module and by the
24 * lkchain module. It releases a lock without looking
25 * the lock up in the lock table.
27 * NOTE: This module is used to define new lock tables. The
28 * multi-level lock table (multi.c) used by the heap
29 * access methods calls these routines. See multi.c for
30 * examples showing how to use this interface.
32 *-------------------------------------------------------------------------
34 #include <stdio.h> /* for sprintf() */
36 #include <sys/types.h>
40 #include "miscadmin.h"
41 #include "storage/shmem.h"
42 #include "storage/sinvaladt.h"
43 #include "storage/spin.h"
44 #include "storage/proc.h"
45 #include "storage/lock.h"
46 #include "utils/dynahash.h"
47 #include "utils/hsearch.h"
48 #include "utils/memutils.h"
49 #include "utils/palloc.h"
50 #include "access/xact.h"
51 #include "access/transam.h"
54 WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode);
56 /*#define LOCK_MGR_DEBUG*/
58 #ifndef LOCK_MGR_DEBUG
60 #define LOCK_PRINT(where,tag,type)
61 #define LOCK_DUMP(where,lock,type)
62 #define LOCK_DUMP_AUX(where,lock,type)
63 #define XID_PRINT(where,xidentP)
65 #else /* LOCK_MGR_DEBUG */
68 unsigned int lock_debug_oid_min = BootstrapObjectIdData;
69 static char *lock_types[] = {
78 #define LOCK_PRINT(where,tag,type)\
79 if ((lockDebug >= 1) && (tag->relId >= lock_debug_oid_min)) \
81 "%s: pid (%d) rel (%d) dbid (%d) tid (%d,%d) type (%s)",where, \
83 tag->relId, tag->dbId, \
84 ((tag->tupleId.ip_blkid.bi_hi<<16)+\
85 tag->tupleId.ip_blkid.bi_lo),\
86 tag->tupleId.ip_posid, \
89 #define LOCK_DUMP(where,lock,type)\
90 if ((lockDebug >= 1) && (lock->tag.relId >= lock_debug_oid_min)) \
91 LOCK_DUMP_AUX(where,lock,type)
93 #define LOCK_DUMP_AUX(where,lock,type)\
95 "%s: pid (%d) rel (%d) dbid (%d) tid (%d,%d) nHolding (%d) "\
96 "holders (%d,%d,%d,%d,%d) type (%s)",where, \
98 lock->tag.relId, lock->tag.dbId, \
99 ((lock->tag.tupleId.ip_blkid.bi_hi<<16)+\
100 lock->tag.tupleId.ip_blkid.bi_lo),\
101 lock->tag.tupleId.ip_posid, \
110 #define XID_PRINT(where,xidentP)\
111 if ((lockDebug >= 2) && \
112 (((LOCK *)MAKE_PTR(xidentP->tag.lock))->tag.relId \
113 >= lock_debug_oid_min)) \
115 "%s: pid (%d) xid (%d) pid (%d) lock (%x) nHolding (%d) "\
116 "holders (%d,%d,%d,%d,%d)",\
123 xidentP->holders[1],\
124 xidentP->holders[2],\
125 xidentP->holders[3],\
126 xidentP->holders[4],\
129 #endif /* LOCK_MGR_DEBUG */
131 SPINLOCK LockMgrLock; /* in Shmem or created in
132 * CreateSpinlocks() */
134 /* This is to simplify/speed up some bit arithmetic */
136 static MASK BITS_OFF[MAX_LOCKMODES];
137 static MASK BITS_ON[MAX_LOCKMODES];
140 * XXX Want to move this to this file
143 static bool LockingIsDisabled;
145 /* -------------------
146 * map from lockmethod to the lock table structure
147 * -------------------
149 static LOCKMETHODTABLE *LockMethodTable[MAX_LOCK_METHODS];
151 static int NumLockMethods;
153 /* -------------------
154 * InitLocks -- Init the lock module. Create a private data
155 * structure for constructing conflict masks.
156 * -------------------
165 /* -------------------
166 * remember 0th lockmode is invalid
167 * -------------------
169 for (i = 0; i < MAX_LOCKMODES; i++, bit <<= 1)
176 /* -------------------
177 * LockDisable -- sets LockingIsDisabled flag to TRUE or FALSE.
181 LockDisable(int status)
183 LockingIsDisabled = status;
188 * LockMethodInit -- initialize the lock table's lock type
191 * Notes: just copying. Should only be called once.
194 LockMethodInit(LOCKMETHODTABLE *lockMethodTable,
201 lockMethodTable->ctl->numLockModes = numModes;
203 for (i = 0; i < numModes; i++, prioP++, conflictsP++)
205 lockMethodTable->ctl->conflictTab[i] = *conflictsP;
206 lockMethodTable->ctl->prio[i] = *prioP;
211 * LockMethodTableInit -- initialize a lock table structure
214 * (a) a lock table has four separate entries in the shmem index
215 * table. This is because every shared hash table and spinlock
216 * has its name stored in the shmem index at its creation. It
217 * is wasteful, in this case, but not much space is involved.
221 LockMethodTableInit(char *tabName,
226 LOCKMETHODTABLE *lockMethodTable;
233 if (numModes > MAX_LOCKMODES)
235 elog(NOTICE, "LockMethodTableInit: too many lock types %d greater than %d",
236 numModes, MAX_LOCKMODES);
237 return (INVALID_TABLEID);
240 /* allocate a string for the shmem index table lookup */
241 shmemName = (char *) palloc((unsigned) (strlen(tabName) + 32));
244 elog(NOTICE, "LockMethodTableInit: couldn't malloc string %s \n", tabName);
245 return (INVALID_TABLEID);
248 /* each lock table has a non-shared header */
249 lockMethodTable = (LOCKMETHODTABLE *) palloc((unsigned) sizeof(LOCKMETHODTABLE));
250 if (!lockMethodTable)
252 elog(NOTICE, "LockMethodTableInit: couldn't malloc lock table %s\n", tabName);
254 return (INVALID_TABLEID);
257 /* ------------------------
258 * find/acquire the spinlock for the table
259 * ------------------------
261 SpinAcquire(LockMgrLock);
264 /* -----------------------
265 * allocate a control structure from shared memory or attach to it
266 * if it already exists.
267 * -----------------------
269 sprintf(shmemName, "%s (ctl)", tabName);
270 lockMethodTable->ctl = (LOCKMETHODCTL *)
271 ShmemInitStruct(shmemName, (unsigned) sizeof(LOCKMETHODCTL), &found);
273 if (!lockMethodTable->ctl)
275 elog(FATAL, "LockMethodTableInit: couldn't initialize %s", tabName);
279 /* -------------------
281 * -------------------
286 * we're first - initialize
291 MemSet(lockMethodTable->ctl, 0, sizeof(LOCKMETHODCTL));
292 lockMethodTable->ctl->masterLock = LockMgrLock;
293 lockMethodTable->ctl->lockmethod = NumLockMethods;
296 /* --------------------
297 * other modules refer to the lock table by a lockmethod
298 * --------------------
300 LockMethodTable[NumLockMethods] = lockMethodTable;
302 Assert(NumLockMethods <= MAX_LOCK_METHODS);
304 /* ----------------------
305 * allocate a hash table for the lock tags. This is used
306 * to find the different locks.
307 * ----------------------
309 info.keysize = sizeof(LOCKTAG);
310 info.datasize = sizeof(LOCK);
311 info.hash = tag_hash;
312 hash_flags = (HASH_ELEM | HASH_FUNCTION);
314 sprintf(shmemName, "%s (lock hash)", tabName);
315 lockMethodTable->lockHash = (HTAB *) ShmemInitHash(shmemName,
316 INIT_TABLE_SIZE, MAX_TABLE_SIZE,
319 Assert(lockMethodTable->lockHash->hash == tag_hash);
320 if (!lockMethodTable->lockHash)
322 elog(FATAL, "LockMethodTableInit: couldn't initialize %s", tabName);
326 /* -------------------------
327 * allocate an xid table. When different transactions hold
328 * the same lock, additional information must be saved (locks per tx).
329 * -------------------------
331 info.keysize = XID_TAGSIZE;
332 info.datasize = sizeof(XIDLookupEnt);
333 info.hash = tag_hash;
334 hash_flags = (HASH_ELEM | HASH_FUNCTION);
336 sprintf(shmemName, "%s (xid hash)", tabName);
337 lockMethodTable->xidHash = (HTAB *) ShmemInitHash(shmemName,
338 INIT_TABLE_SIZE, MAX_TABLE_SIZE,
341 if (!lockMethodTable->xidHash)
343 elog(FATAL, "LockMethodTableInit: couldn't initialize %s", tabName);
347 /* init ctl data structures */
348 LockMethodInit(lockMethodTable, conflictsP, prioP, numModes);
350 SpinRelease(LockMgrLock);
355 return (lockMethodTable->ctl->lockmethod);
357 return (INVALID_TABLEID);
361 * LockMethodTableRename -- allocate another lockmethod to the same
364 * NOTES: Both the lock module and the lock chain (lchain.c)
365 * module use table id's to distinguish between different
366 * kinds of locks. Short term and long term locks look
367 * the same to the lock table, but are handled differently
368 * by the lock chain manager. This function allows the
369 * client to use different lockmethods when acquiring/releasing
370 * short term and long term locks.
374 LockMethodTableRename(LOCKMETHOD lockmethod)
376 LOCKMETHOD newLockMethod;
378 if (NumLockMethods >= MAX_LOCK_METHODS)
379 return (INVALID_TABLEID);
380 if (LockMethodTable[lockmethod] == INVALID_TABLEID)
381 return (INVALID_TABLEID);
383 /* other modules refer to the lock table by a lockmethod */
384 newLockMethod = NumLockMethods;
387 LockMethodTable[newLockMethod] = LockMethodTable[lockmethod];
388 return (newLockMethod);
393 * LockAcquire -- Check for lock conflicts, sleep if conflict found,
394 * set lock if/when no conflicts.
396 * Returns: TRUE if parameters are correct, FALSE otherwise.
398 * Side Effects: The lock is always acquired. No way to abort
399 * a lock acquisition other than aborting the transaction.
400 * Lock is recorded in the lkchain.
402 * Note on User Locks:
403 * User locks are handled totally on the application side as
404 * long term cooperative locks which extend beyond the normal
405 * transaction boundaries. Their purpose is to indicate to an
406 * application that someone is `working' on an item. So it is
407 * possible to put an user lock on a tuple's oid, retrieve the
408 * tuple, work on it for an hour and then update it and remove
409 * the lock. While the lock is active other clients can still
410 * read and write the tuple but they can be aware that it has
411 * been locked at the application level by someone.
412 * User locks use lock tags made of an uint16 and an uint32, for
413 * example 0 and a tuple oid, or any other arbitrary pair of
414 * numbers following a convention established by the application.
415 * In this sense tags don't refer to tuples or database entities.
416 * User locks and normal locks are completely orthogonal and
417 * they don't interfere with each other, so it is possible
418 * to acquire a normal lock on an user-locked tuple or user-lock
419 * a tuple for which a normal write lock already exists.
420 * User locks are always non blocking, therefore they are never
421 * acquired if already held by another process. They must be
422 * released explicitly by the application but they are released
423 * automatically when a backend terminates.
424 * They are indicated by a dummy lockmethod 0 which doesn't have
425 * any table allocated but uses the normal lock table, and are
426 * distinguished from normal locks for the following differences:
428 * normal lock user lock
431 * tag.relId rel oid 0
432 * tag.ItemPointerData.ip_blkid block id lock id2
433 * tag.ItemPointerData.ip_posid tuple offset lock id1
434 * xid.pid 0 backend pid
435 * xid.xid current xid 0
436 * persistence transaction user or backend
438 * The lockmode parameter can have the same values for normal locks
439 * although probably only WRITE_LOCK can have some practical use.
446 LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
448 XIDLookupEnt *result,
454 LOCKMETHODTABLE *lockMethodTable;
461 is_user_lock = (lockmethod == 0);
465 #ifdef USER_LOCKS_DEBUG
466 elog(NOTICE, "LockAcquire: user lock tag [%u,%u] %d",
467 locktag->tupleId.ip_posid,
468 ((locktag->tupleId.ip_blkid.bi_hi << 16) +
469 locktag->tupleId.ip_blkid.bi_lo),
475 Assert(lockmethod < NumLockMethods);
476 lockMethodTable = LockMethodTable[lockmethod];
477 if (!lockMethodTable)
479 elog(NOTICE, "LockAcquire: bad lock table %d", lockmethod);
483 if (LockingIsDisabled)
486 LOCK_PRINT("Acquire", locktag, lockmode);
487 masterLock = lockMethodTable->ctl->masterLock;
489 SpinAcquire(masterLock);
491 Assert(lockMethodTable->lockHash->hash == tag_hash);
492 lock = (LOCK *) hash_search(lockMethodTable->lockHash, (Pointer) locktag, HASH_ENTER, &found);
496 SpinRelease(masterLock);
497 elog(FATAL, "LockAcquire: lock table %d is corrupted", lockmethod);
501 /* --------------------
502 * if there was nothing else there, complete initialization
503 * --------------------
508 ProcQueueInit(&(lock->waitProcs));
509 MemSet((char *) lock->holders, 0, sizeof(int) * MAX_LOCKMODES);
510 MemSet((char *) lock->activeHolders, 0, sizeof(int) * MAX_LOCKMODES);
514 Assert(BlockIdEquals(&(lock->tag.tupleId.ip_blkid),
515 &(locktag->tupleId.ip_blkid)));
519 /* ------------------
520 * add an element to the lock queue so that we can clear the
521 * locks at end of transaction.
524 xidTable = lockMethodTable->xidHash;
525 myXid = GetCurrentTransactionId();
527 /* ------------------
528 * Zero out all of the tag bytes (this clears the padding bytes for long
529 * word alignment and ensures hashing consistency).
532 MemSet(&item, 0, XID_TAGSIZE); /* must clear padding, needed */
533 TransactionIdStore(myXid, &item.tag.xid);
534 item.tag.lock = MAKE_OFFSET(lock);
536 item.tag.pid = MyPid;
542 item.tag.pid = MyProcPid;
543 item.tag.xid = myXid = 0;
544 #ifdef USER_LOCKS_DEBUG
545 elog(NOTICE, "LockAcquire: user lock xid [%d,%d,%d]",
546 item.tag.lock, item.tag.pid, item.tag.xid);
551 result = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &item, HASH_ENTER, &found);
554 elog(NOTICE, "LockAcquire: xid table corrupted");
555 return (STATUS_ERROR);
559 XID_PRINT("LockAcquire: queueing XidEnt", result);
560 ProcAddLock(&result->queue);
561 result->nHolding = 0;
562 MemSet((char *) result->holders, 0, sizeof(int) * MAX_LOCKMODES);
566 * lock->nholding tells us how many processes have _tried_ to
567 * acquire this lock, Regardless of whether they succeeded or
568 * failed in doing so.
572 lock->holders[lockmode]++;
574 /* --------------------
575 * If I'm the only one holding a lock, then there
576 * cannot be a conflict. Need to subtract one from the
577 * lock's count since we just bumped the count up by 1
579 * --------------------
581 if (result->nHolding == lock->nActive)
583 result->holders[lockmode]++;
585 GrantLock(lock, lockmode);
586 SpinRelease(masterLock);
590 Assert(result->nHolding <= lock->nActive);
592 status = LockResolveConflicts(lockmethod, lock, lockmode, myXid);
594 if (status == STATUS_OK)
595 GrantLock(lock, lockmode);
596 else if (status == STATUS_FOUND)
601 * User locks are non blocking. If we can't acquire a lock remove
602 * the xid entry and return FALSE without waiting.
606 if (!result->nHolding)
608 SHMQueueDelete(&result->queue);
609 hash_search(xidTable, (Pointer) &item, HASH_REMOVE, &found);
612 lock->holders[lockmode]--;
613 SpinRelease(masterLock);
614 #ifdef USER_LOCKS_DEBUG
615 elog(NOTICE, "LockAcquire: user lock failed");
620 status = WaitOnLock(lockmethod, lock, lockmode);
621 XID_PRINT("Someone granted me the lock", result);
624 SpinRelease(masterLock);
626 return (status == STATUS_OK);
629 /* ----------------------------
630 * LockResolveConflicts -- test for lock conflicts
633 * Here's what makes this complicated: one transaction's
634 * locks don't conflict with one another. When many processes
635 * hold locks, each has to subtract off the other's locks when
636 * determining whether or not any new lock acquired conflicts with
639 * For example, if I am already holding a WRITE_INTENT lock,
640 * there will not be a conflict with my own READ_LOCK. If I
641 * don't consider the intent lock when checking for conflicts,
642 * I find no conflict.
643 * ----------------------------
646 LockResolveConflicts(LOCKMETHOD lockmethod,
651 XIDLookupEnt *result,
661 numLockModes = LockMethodTable[lockmethod]->ctl->numLockModes;
662 xidTable = LockMethodTable[lockmethod]->xidHash;
664 /* ---------------------
665 * read my own statistics from the xid table. If there
666 * isn't an entry, then we'll just add one.
668 * Zero out the tag, this clears the padding bytes for long
669 * word alignment and ensures hashing consistency.
672 MemSet(&item, 0, XID_TAGSIZE);
673 TransactionIdStore(xid, &item.tag.xid);
674 item.tag.lock = MAKE_OFFSET(lock);
679 if (!(result = (XIDLookupEnt *)
680 hash_search(xidTable, (Pointer) &item, HASH_ENTER, &found)))
682 elog(NOTICE, "LockResolveConflicts: xid table corrupted");
683 return (STATUS_ERROR);
685 myHolders = result->holders;
690 * we're not holding any type of lock yet. Clear
694 MemSet(result->holders, 0, numLockModes * sizeof(*(lock->holders)));
695 result->nHolding = 0;
699 /* ------------------------
700 * If someone with a greater priority is waiting for the lock,
701 * do not continue and share the lock, even if we can. bjm
702 * ------------------------
704 int myprio = LockMethodTable[lockmethod]->ctl->prio[lockmode];
705 PROC_QUEUE *waitQueue = &(lock->waitProcs);
706 PROC *topproc = (PROC *) MAKE_PTR(waitQueue->links.prev);
708 if (waitQueue->size && topproc->prio > myprio)
712 /* ----------------------------
713 * first check for global conflicts: If no locks conflict
714 * with mine, then I get the lock.
716 * Checking for conflict: lock->mask represents the types of
717 * currently held locks. conflictTable[lockmode] has a bit
718 * set for each type of lock that conflicts with mine. Bitwise
719 * compare tells if there is a conflict.
720 * ----------------------------
722 if (!(LockMethodTable[lockmethod]->ctl->conflictTab[lockmode] & lock->mask))
725 result->holders[lockmode]++;
728 XID_PRINT("Conflict Resolved: updated xid entry stats", result);
733 /* ------------------------
734 * Rats. Something conflicts. But it could still be my own
735 * lock. We have to construct a conflict mask
736 * that does not reflect our own locks.
737 * ------------------------
741 for (i = 1; i <= numLockModes; i++, tmpMask <<= 1)
743 if (lock->activeHolders[i] != myHolders[i])
747 /* ------------------------
748 * now check again for conflicts. 'bitmask' describes the types
749 * of locks held by other processes. If one of these
750 * conflicts with the kind of lock that I want, there is a
751 * conflict and I have to sleep.
752 * ------------------------
754 if (!(LockMethodTable[lockmethod]->ctl->conflictTab[lockmode] & bitmask))
757 /* no conflict. Get the lock and go on */
759 result->holders[lockmode]++;
762 XID_PRINT("Conflict Resolved: updated xid entry stats", result);
768 return (STATUS_FOUND);
772 WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode)
774 PROC_QUEUE *waitQueue = &(lock->waitProcs);
775 LOCKMETHODTABLE *lockMethodTable = LockMethodTable[lockmethod];
776 int prio = lockMethodTable->ctl->prio[lockmode];
778 Assert(lockmethod < NumLockMethods);
781 * the waitqueue is ordered by priority. I insert myself according to
782 * the priority of the lock I am acquiring.
784 * SYNC NOTE: I am assuming that the lock table spinlock is sufficient
785 * synchronization for this queue. That will not be true if/when
786 * people can be deleted from the queue by a SIGINT or something.
788 LOCK_DUMP_AUX("WaitOnLock: sleeping on lock", lock, lockmode);
789 if (ProcSleep(waitQueue,
790 lockMethodTable->ctl->masterLock,
795 /* -------------------
796 * This could have happend as a result of a deadlock, see HandleDeadLock()
797 * Decrement the lock nHolding and holders fields as we are no longer
798 * waiting on this lock.
799 * -------------------
802 lock->holders[lockmode]--;
803 LOCK_DUMP_AUX("WaitOnLock: aborting on lock", lock, lockmode);
804 SpinRelease(lockMethodTable->ctl->masterLock);
805 elog(ERROR, "WaitOnLock: error on wakeup - Aborting this transaction");
808 LOCK_DUMP_AUX("WaitOnLock: wakeup on lock", lock, lockmode);
813 * LockRelease -- look up 'locktag' in lock table 'lockmethod' and
816 * Side Effects: if the lock no longer conflicts with the highest
817 * priority waiting process, that process is granted the lock
818 * and awoken. (We have to grant the lock here to avoid a
819 * race between the waking process and any new process to
820 * come along and request the lock).
823 LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
828 LOCKMETHODTABLE *lockMethodTable;
829 XIDLookupEnt *result,
832 bool wakeupNeeded = true;
837 is_user_lock = (lockmethod == 0);
841 #ifdef USER_LOCKS_DEBUG
842 elog(NOTICE, "LockRelease: user lock tag [%u,%u] %d",
843 locktag->tupleId.ip_posid,
844 ((locktag->tupleId.ip_blkid.bi_hi << 16) +
845 locktag->tupleId.ip_blkid.bi_lo),
851 Assert(lockmethod < NumLockMethods);
852 lockMethodTable = LockMethodTable[lockmethod];
853 if (!lockMethodTable)
855 elog(NOTICE, "lockMethodTable is null in LockRelease");
859 if (LockingIsDisabled)
862 LOCK_PRINT("Release", locktag, lockmode);
864 masterLock = lockMethodTable->ctl->masterLock;
865 xidTable = lockMethodTable->xidHash;
867 SpinAcquire(masterLock);
869 Assert(lockMethodTable->lockHash->hash == tag_hash);
871 hash_search(lockMethodTable->lockHash, (Pointer) locktag, HASH_FIND_SAVE, &found);
876 * If the entry is not found hash_search returns TRUE instead of NULL,
877 * so we must check it explicitly.
879 if ((is_user_lock) && (lock == (LOCK *) TRUE))
881 SpinRelease(masterLock);
882 elog(NOTICE, "LockRelease: there are no locks with this tag");
888 * let the caller print its own error message, too. Do not
893 SpinRelease(masterLock);
894 elog(NOTICE, "LockRelease: locktable corrupted");
900 SpinRelease(masterLock);
901 elog(NOTICE, "LockRelease: locktable lookup failed, no lock");
905 Assert(lock->nHolding > 0);
910 * If this is an user lock it can be removed only after checking that
911 * it was acquired by the current process, so this code is skipped and
919 * fix the general lock stats
922 lock->holders[lockmode]--;
924 lock->activeHolders[lockmode]--;
926 Assert(lock->nActive >= 0);
930 /* ------------------
931 * if there's no one waiting in the queue,
932 * we just released the last lock.
933 * Delete it from the lock table.
936 Assert(lockMethodTable->lockHash->hash == tag_hash);
937 lock = (LOCK *) hash_search(lockMethodTable->lockHash,
938 (Pointer) &(lock->tag),
941 Assert(lock && found);
942 wakeupNeeded = false;
948 /* ------------------
949 * Zero out all of the tag bytes (this clears the padding bytes for long
950 * word alignment and ensures hashing consistency).
953 MemSet(&item, 0, XID_TAGSIZE);
955 TransactionIdStore(GetCurrentTransactionId(), &item.tag.xid);
956 item.tag.lock = MAKE_OFFSET(lock);
958 item.tag.pid = MyPid;
964 item.tag.pid = MyProcPid;
966 #ifdef USER_LOCKS_DEBUG
967 elog(NOTICE, "LockRelease: user lock xid [%d,%d,%d]",
968 item.tag.lock, item.tag.pid, item.tag.xid);
973 if (!(result = (XIDLookupEnt *) hash_search(xidTable,
979 SpinRelease(masterLock);
981 if ((is_user_lock) && (result))
982 elog(NOTICE, "LockRelease: you don't have a lock on this tag");
984 elog(NOTICE, "LockRelease: find xid, table corrupted");
986 elog(NOTICE, "LockReplace: xid table corrupted");
992 * now check to see if I have any private locks. If I do, decrement
993 * the counts associated with them.
995 result->holders[lockmode]--;
998 XID_PRINT("LockRelease updated xid stats", result);
1001 * If this was my last hold on this lock, delete my entry in the XID
1004 if (!result->nHolding)
1007 if (result->queue.prev == INVALID_OFFSET)
1008 elog(NOTICE, "LockRelease: xid.prev == INVALID_OFFSET");
1009 if (result->queue.next == INVALID_OFFSET)
1010 elog(NOTICE, "LockRelease: xid.next == INVALID_OFFSET");
1012 if (result->queue.next != INVALID_OFFSET)
1013 SHMQueueDelete(&result->queue);
1014 if (!(result = (XIDLookupEnt *)
1015 hash_search(xidTable, (Pointer) &item, HASH_REMOVE_SAVED, &found)) ||
1018 SpinRelease(masterLock);
1020 elog(NOTICE, "LockRelease: remove xid, table corrupted");
1022 elog(NOTICE, "LockReplace: xid table corrupted");
1031 * If this is an user lock remove it now, after the corresponding xid
1032 * entry has been found and deleted.
1038 * fix the general lock stats
1041 lock->holders[lockmode]--;
1043 lock->activeHolders[lockmode]--;
1045 Assert(lock->nActive >= 0);
1047 if (!lock->nHolding)
1049 /* ------------------
1050 * if there's no one waiting in the queue,
1051 * we just released the last lock.
1052 * Delete it from the lock table.
1053 * ------------------
1055 Assert(lockMethodTable->lockHash->hash == tag_hash);
1056 lock = (LOCK *) hash_search(lockMethodTable->lockHash,
1057 (Pointer) &(lock->tag),
1060 Assert(lock && found);
1061 wakeupNeeded = false;
1066 /* --------------------------
1067 * If there are still active locks of the type I just released, no one
1068 * should be woken up. Whoever is asleep will still conflict
1069 * with the remaining locks.
1070 * --------------------------
1072 if (!(lock->activeHolders[lockmode]))
1074 /* change the conflict mask. No more of this lock type. */
1075 lock->mask &= BITS_OFF[lockmode];
1080 /* --------------------------
1081 * Wake the first waiting process and grant him the lock if it
1082 * doesn't conflict. The woken process must record the lock
1084 * --------------------------
1086 ProcLockWakeup(&(lock->waitProcs), lockmethod, lock);
1089 SpinRelease(masterLock);
1094 * GrantLock -- update the lock data structure to show
1095 * the new lock holder.
1098 GrantLock(LOCK *lock, LOCKMODE lockmode)
1101 lock->activeHolders[lockmode]++;
1102 lock->mask |= BITS_ON[lockmode];
1107 * LockReleaseAll -- Release all locks in a process lock queue.
1109 * Note: This code is a little complicated by the presence in the
1110 * same queue of user locks which can't be removed from the
1111 * normal lock queue at the end of a transaction. They must
1112 * however be removed when the backend exits.
1113 * A dummy lockmethod 0 is used to indicate that we are releasing
1114 * the user locks, from the code added to ProcKill().
1118 LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue)
1120 PROC_QUEUE *waitQueue;
1122 XIDLookupEnt *xidLook = NULL;
1123 XIDLookupEnt *tmp = NULL;
1124 SHMEM_OFFSET end = MAKE_OFFSET(lockQueue);
1125 SPINLOCK masterLock;
1126 LOCKMETHODTABLE *lockMethodTable;
1133 int is_user_lock_table,
1137 is_user_lock_table = (lockmethod == 0);
1138 #ifdef USER_LOCKS_DEBUG
1139 elog(NOTICE, "LockReleaseAll: lockmethod=%d, pid=%d", lockmethod, MyProcPid);
1141 if (is_user_lock_table)
1145 Assert(lockmethod < NumLockMethods);
1146 lockMethodTable = LockMethodTable[lockmethod];
1147 if (!lockMethodTable)
1150 numLockModes = lockMethodTable->ctl->numLockModes;
1151 masterLock = lockMethodTable->ctl->masterLock;
1153 if (SHMQueueEmpty(lockQueue))
1157 SpinAcquire(masterLock);
1159 SHMQueueFirst(lockQueue, (Pointer *) &xidLook, &xidLook->queue);
1161 XID_PRINT("LockReleaseAll", xidLook);
1164 SpinAcquire(masterLock);
1170 /* ---------------------------
1171 * XXX Here we assume the shared memory queue is circular and
1172 * that we know its internal structure. Should have some sort of
1173 * macros to allow one to walk it. mer 20 July 1991
1174 * ---------------------------
1176 done = (xidLook->queue.next == end);
1177 lock = (LOCK *) MAKE_PTR(xidLook->tag.lock);
1179 LOCK_PRINT("LockReleaseAll", (&lock->tag), 0);
1184 * Sometimes the queue appears to be messed up.
1188 elog(NOTICE, "LockReleaseAll: xid loop detected, giving up");
1192 if (is_user_lock_table)
1194 if ((xidLook->tag.pid == 0) || (xidLook->tag.xid != 0))
1196 #ifdef USER_LOCKS_DEBUG
1197 elog(NOTICE, "LockReleaseAll: skip normal lock [%d,%d,%d]",
1198 xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
1203 if (xidLook->tag.pid != MyProcPid)
1205 /* This should never happen */
1206 #ifdef USER_LOCKS_DEBUG
1208 "LockReleaseAll: skip other pid [%u,%u] [%d,%d,%d]",
1209 lock->tag.tupleId.ip_posid,
1210 ((lock->tag.tupleId.ip_blkid.bi_hi << 16) +
1211 lock->tag.tupleId.ip_blkid.bi_lo),
1212 xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
1217 #ifdef USER_LOCKS_DEBUG
1219 "LockReleaseAll: release user lock [%u,%u] [%d,%d,%d]",
1220 lock->tag.tupleId.ip_posid,
1221 ((lock->tag.tupleId.ip_blkid.bi_hi << 16) +
1222 lock->tag.tupleId.ip_blkid.bi_lo),
1223 xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
1228 if ((xidLook->tag.pid != 0) || (xidLook->tag.xid == 0))
1230 #ifdef USER_LOCKS_DEBUG
1232 "LockReleaseAll: skip user lock [%u,%u] [%d,%d,%d]",
1233 lock->tag.tupleId.ip_posid,
1234 ((lock->tag.tupleId.ip_blkid.bi_hi << 16) +
1235 lock->tag.tupleId.ip_blkid.bi_lo),
1236 xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
1241 #ifdef USER_LOCKS_DEBUG
1242 elog(NOTICE, "LockReleaseAll: release normal lock [%d,%d,%d]",
1243 xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
1248 /* ------------------
1249 * fix the general lock stats
1250 * ------------------
1252 if (lock->nHolding != xidLook->nHolding)
1254 lock->nHolding -= xidLook->nHolding;
1255 lock->nActive -= xidLook->nHolding;
1256 Assert(lock->nActive >= 0);
1257 for (i = 1; i <= numLockModes; i++)
1259 lock->holders[i] -= xidLook->holders[i];
1260 lock->activeHolders[i] -= xidLook->holders[i];
1261 if (!lock->activeHolders[i])
1262 lock->mask &= BITS_OFF[i];
1268 * set nHolding to zero so that we can garbage collect the lock
1275 * always remove the xidLookup entry, we're done with it now
1279 SHMQueueDelete(&xidLook->queue);
1281 if ((!hash_search(lockMethodTable->xidHash, (Pointer) xidLook, HASH_REMOVE, &found))
1284 SpinRelease(masterLock);
1285 elog(NOTICE, "LockReleaseAll: xid table corrupted");
1289 if (!lock->nHolding)
1291 /* --------------------
1292 * if there's no one waiting in the queue, we've just released
1294 * --------------------
1297 Assert(lockMethodTable->lockHash->hash == tag_hash);
1299 hash_search(lockMethodTable->lockHash, (Pointer) &(lock->tag), HASH_REMOVE, &found);
1300 if ((!lock) || (!found))
1302 SpinRelease(masterLock);
1303 elog(NOTICE, "LockReleaseAll: cannot remove lock from HTAB");
1309 /* --------------------
1310 * Wake the first waiting process and grant him the lock if it
1311 * doesn't conflict. The woken process must record the lock
1313 * --------------------
1315 waitQueue = &(lock->waitProcs);
1316 ProcLockWakeup(waitQueue, lockmethod, lock);
1324 SHMQueueFirst(&xidLook->queue, (Pointer *) &tmp, &tmp->queue);
1327 SpinRelease(masterLock);
1331 * Reinitialize the queue only if nothing has been left in.
1335 SHMQueueInit(lockQueue);
1348 nLockBuckets = 1 << (int) my_log2((NLOCKENTS - 1) / DEF_FFACTOR + 1);
1349 nLockSegs = 1 << (int) my_log2((nLockBuckets - 1) / DEF_SEGSIZE + 1);
1351 nXidBuckets = 1 << (int) my_log2((NLOCKS_PER_XACT - 1) / DEF_FFACTOR + 1);
1352 nXidSegs = 1 << (int) my_log2((nLockBuckets - 1) / DEF_SEGSIZE + 1);
1354 size += MAXALIGN(NBACKENDS * sizeof(PROC)); /* each MyProc */
1355 size += MAXALIGN(NBACKENDS * sizeof(LOCKMETHODCTL)); /* each lockMethodTable->ctl */
1356 size += MAXALIGN(sizeof(PROC_HDR)); /* ProcGlobal */
1358 size += MAXALIGN(my_log2(NLOCKENTS) * sizeof(void *));
1359 size += MAXALIGN(sizeof(HHDR));
1360 size += nLockSegs * MAXALIGN(DEF_SEGSIZE * sizeof(SEGMENT));
1361 size += NLOCKENTS * /* XXX not multiple of BUCKET_ALLOC_INCR? */
1362 (MAXALIGN(sizeof(BUCKET_INDEX)) +
1363 MAXALIGN(sizeof(LOCK))); /* contains hash key */
1365 size += MAXALIGN(my_log2(NBACKENDS) * sizeof(void *));
1366 size += MAXALIGN(sizeof(HHDR));
1367 size += nXidSegs * MAXALIGN(DEF_SEGSIZE * sizeof(SEGMENT));
1368 size += NBACKENDS * /* XXX not multiple of BUCKET_ALLOC_INCR? */
1369 (MAXALIGN(sizeof(BUCKET_INDEX)) +
1370 MAXALIGN(sizeof(XIDLookupEnt))); /* contains hash key */
1375 /* -----------------
1376 * Boolean function to determine current locking status
1382 return LockingIsDisabled;
1386 * DeadlockCheck -- Checks for deadlocks for a given process
1388 * We can't block on user locks, so no sense testing for deadlock
1389 * because there is no blocking, and no timer for the block.
1391 * This code takes a list of locks a process holds, and the lock that
1392 * the process is sleeping on, and tries to find if any of the processes
1393 * waiting on its locks hold the lock it is waiting for. If no deadlock
1394 * is found, it goes on to look at all the processes waiting on their locks.
1396 * We have already locked the master lock before being called.
1399 DeadLockCheck(SHM_QUEUE *lockQueue, LOCK *findlock, bool skip_check)
1402 XIDLookupEnt *xidLook = NULL;
1403 XIDLookupEnt *tmp = NULL;
1404 SHMEM_OFFSET end = MAKE_OFFSET(lockQueue);
1407 LOCKMETHODTABLE *lockMethodTable;
1408 XIDLookupEnt *result,
1413 static PROC *checked_procs[MaxBackendId];
1415 static bool MyNHolding;
1417 /* initialize at start of recursion */
1420 checked_procs[0] = MyProc;
1423 lockMethodTable = LockMethodTable[1];
1424 xidTable = lockMethodTable->xidHash;
1426 MemSet(&item, 0, XID_TAGSIZE);
1427 TransactionIdStore(MyProc->xid, &item.tag.xid);
1428 item.tag.lock = MAKE_OFFSET(findlock);
1433 if (!(result = (XIDLookupEnt *)
1434 hash_search(xidTable, (Pointer) &item, HASH_FIND, &found)) || !found)
1436 elog(NOTICE, "LockAcquire: xid table corrupted");
1439 MyNHolding = result->nHolding;
1441 if (SHMQueueEmpty(lockQueue))
1444 SHMQueueFirst(lockQueue, (Pointer *) &xidLook, &xidLook->queue);
1446 XID_PRINT("DeadLockCheck", xidLook);
1450 /* ---------------------------
1451 * XXX Here we assume the shared memory queue is circular and
1452 * that we know its internal structure. Should have some sort of
1453 * macros to allow one to walk it. mer 20 July 1991
1454 * ---------------------------
1456 done = (xidLook->queue.next == end);
1457 lock = (LOCK *) MAKE_PTR(xidLook->tag.lock);
1459 LOCK_PRINT("DeadLockCheck", (&lock->tag), 0);
1462 * This is our only check to see if we found the lock we want.
1464 * The lock we are waiting for is already in MyProc->lockQueue so we
1465 * need to skip it here. We are trying to find it in someone
1468 if (lock == findlock && !skip_check)
1472 PROC_QUEUE *waitQueue = &(lock->waitProcs);
1477 proc = (PROC *) MAKE_PTR(waitQueue->links.prev);
1478 for (i = 0; i < waitQueue->size; i++)
1480 if (proc != MyProc &&
1481 lock == findlock && /* skip_check also true */
1482 MyNHolding) /* I already hold some lock on it */
1486 * For findlock's wait queue, we are interested in
1487 * procs who are blocked waiting for a write-lock on
1488 * the table we are waiting on, and already hold a
1489 * lock on it. We first check to see if there is an
1490 * escalation deadlock, where we hold a readlock and
1491 * want a writelock, and someone else holds readlock
1492 * on the same table, and wants a writelock.
1494 * Basically, the test is, "Do we both hold some lock on
1495 * findlock, and we are both waiting in the lock
1500 Assert(MyProc->prio == 2);
1502 lockMethodTable = LockMethodTable[1];
1503 xidTable = lockMethodTable->xidHash;
1505 MemSet(&item, 0, XID_TAGSIZE);
1506 TransactionIdStore(proc->xid, &item.tag.xid);
1507 item.tag.lock = MAKE_OFFSET(findlock);
1512 if (!(result = (XIDLookupEnt *)
1513 hash_search(xidTable, (Pointer) &item, HASH_FIND, &found)) || !found)
1515 elog(NOTICE, "LockAcquire: xid table corrupted");
1518 if (result->nHolding)
1523 * No sense in looking at the wait queue of the lock we
1524 * are looking for. If lock == findlock, and I got here,
1525 * skip_check must be true too.
1527 if (lock != findlock)
1529 for (j = 0; j < nprocs; j++)
1530 if (checked_procs[j] == proc)
1532 if (j >= nprocs && lock != findlock)
1534 checked_procs[nprocs++] = proc;
1535 Assert(nprocs <= MaxBackendId);
1538 * For non-MyProc entries, we are looking only
1539 * waiters, not necessarily people who already
1540 * hold locks and are waiting. Now we check for
1541 * cases where we have two or more tables in a
1542 * deadlock. We do this by continuing to search
1543 * for someone holding a lock
1545 if (DeadLockCheck(&(proc->lockQueue), findlock, false))
1549 proc = (PROC *) MAKE_PTR(proc->links.prev);
1555 SHMQueueFirst(&xidLook->queue, (Pointer *) &tmp, &tmp->queue);
1559 /* if we got here, no deadlock */
1563 #ifdef DEADLOCK_DEBUG
1565 * Dump all locks. Must have already acquired the masterLock.
1570 SHMEM_OFFSET location;
1572 SHM_QUEUE *lockQueue;
1574 XIDLookupEnt *xidLook = NULL;
1575 XIDLookupEnt *tmp = NULL;
1577 SPINLOCK masterLock;
1583 LOCKMETHODTABLE *lockMethodTable;
1585 ShmemPIDLookup(MyProcPid, &location);
1586 if (location == INVALID_OFFSET)
1588 proc = (PROC *) MAKE_PTR(location);
1591 lockQueue = &proc->lockQueue;
1593 Assert(lockmethod < NumLockMethods);
1594 lockMethodTable = LockMethodTable[lockmethod];
1595 if (!lockMethodTable)
1598 numLockModes = lockMethodTable->ctl->numLockModes;
1599 masterLock = lockMethodTable->ctl->masterLock;
1601 if (SHMQueueEmpty(lockQueue))
1604 SHMQueueFirst(lockQueue, (Pointer *) &xidLook, &xidLook->queue);
1605 end = MAKE_OFFSET(lockQueue);
1607 LOCK_DUMP("DumpLocks", MyProc->waitLock, 0);
1608 XID_PRINT("DumpLocks", xidLook);
1612 /* ---------------------------
1613 * XXX Here we assume the shared memory queue is circular and
1614 * that we know its internal structure. Should have some sort of
1615 * macros to allow one to walk it. mer 20 July 1991
1616 * ---------------------------
1618 done = (xidLook->queue.next == end);
1619 lock = (LOCK *) MAKE_PTR(xidLook->tag.lock);
1621 LOCK_DUMP("DumpLocks", lock, 0);
1625 elog(NOTICE, "DumpLocks: xid loop detected, giving up");
1631 SHMQueueFirst(&xidLook->queue, (Pointer *) &tmp, &tmp->queue);