1 /*-------------------------------------------------------------------------
4 * simple lock acquisition
6 * Copyright (c) 1994, Regents of the University of California
10 * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.21 1998/01/25 05:14:02 momjian Exp $
13 * Outside modules can create a lock table and acquire/release
14 * locks. A lock table is a shared memory hash table. When
15 * a process tries to acquire a lock of a type that conflicts
16 * with existing locks, it is put to sleep using the routines
17 * in storage/lmgr/proc.c.
21 * LockAcquire(), LockRelease(), LockTabInit().
23 * LockReplace() is called only within this module and by the
24 * lkchain module. It releases a lock without looking
25 * the lock up in the lock table.
27 * NOTE: This module is used to define new lock tables. The
28 * multi-level lock table (multi.c) used by the heap
29 * access methods calls these routines. See multi.c for
30 * examples showing how to use this interface.
32 *-------------------------------------------------------------------------
34 #include <stdio.h> /* for sprintf() */
36 #include <sys/types.h>
40 #include "miscadmin.h"
41 #include "storage/shmem.h"
42 #include "storage/spin.h"
43 #include "storage/proc.h"
44 #include "storage/lock.h"
45 #include "utils/dynahash.h"
46 #include "utils/hsearch.h"
47 #include "utils/memutils.h"
48 #include "utils/palloc.h"
49 #include "access/xact.h"
50 #include "access/transam.h"
52 static int WaitOnLock(LOCKTAB *ltable, LockTableId tableId, LOCK *lock,
55 /*#define LOCK_MGR_DEBUG*/
57 #ifndef LOCK_MGR_DEBUG
59 #define LOCK_PRINT(where,tag,type)
60 #define LOCK_DUMP(where,lock,type)
61 #define LOCK_DUMP_AUX(where,lock,type)
62 #define XID_PRINT(where,xidentP)
64 #else /* LOCK_MGR_DEBUG */
67 unsigned int lock_debug_oid_min = BootstrapObjectIdData;
68 static char *lock_types[] = {
77 #define LOCK_PRINT(where,tag,type)\
78 if ((lockDebug >= 1) && (tag->relId >= lock_debug_oid_min)) \
80 "%s: pid (%d) rel (%d) dbid (%d) tid (%d,%d) type (%s)",where, \
82 tag->relId, tag->dbId, \
83 ((tag->tupleId.ip_blkid.bi_hi<<16)+\
84 tag->tupleId.ip_blkid.bi_lo),\
85 tag->tupleId.ip_posid, \
88 #define LOCK_DUMP(where,lock,type)\
89 if ((lockDebug >= 1) && (lock->tag.relId >= lock_debug_oid_min)) \
90 LOCK_DUMP_AUX(where,lock,type)
92 #define LOCK_DUMP_AUX(where,lock,type)\
94 "%s: pid (%d) rel (%d) dbid (%d) tid (%d,%d) nHolding (%d) "\
95 "holders (%d,%d,%d,%d,%d) type (%s)",where, \
97 lock->tag.relId, lock->tag.dbId, \
98 ((lock->tag.tupleId.ip_blkid.bi_hi<<16)+\
99 lock->tag.tupleId.ip_blkid.bi_lo),\
100 lock->tag.tupleId.ip_posid, \
109 #define XID_PRINT(where,xidentP)\
110 if ((lockDebug >= 2) && \
111 (((LOCK *)MAKE_PTR(xidentP->tag.lock))->tag.relId \
112 >= lock_debug_oid_min)) \
114 "%s: pid (%d) xid (%d) pid (%d) lock (%x) nHolding (%d) "\
115 "holders (%d,%d,%d,%d,%d)",\
122 xidentP->holders[1],\
123 xidentP->holders[2],\
124 xidentP->holders[3],\
125 xidentP->holders[4],\
128 #endif /* LOCK_MGR_DEBUG */
130 SPINLOCK LockMgrLock; /* in Shmem or created in
131 * CreateSpinlocks() */
133 /* This is to simplify/speed up some bit arithmetic */
135 static MASK BITS_OFF[MAX_LOCKTYPES];
136 static MASK BITS_ON[MAX_LOCKTYPES];
139 * XXX Want to move this to this file
142 static bool LockingIsDisabled;
144 /* -------------------
145 * map from tableId to the lock table structure
146 * -------------------
148 static LOCKTAB *AllTables[MAX_TABLES];
150 /* -------------------
152 * -------------------
154 static int NumTables = 1;
156 /* -------------------
157 * InitLocks -- Init the lock module. Create a private data
158 * structure for constructing conflict masks.
159 * -------------------
168 /* -------------------
169 * remember 0th locktype is invalid
170 * -------------------
172 for (i = 0; i < MAX_LOCKTYPES; i++, bit <<= 1)
179 /* -------------------
180 * LockDisable -- sets LockingIsDisabled flag to TRUE or FALSE.
184 LockDisable(int status)
186 LockingIsDisabled = status;
191 * LockTypeInit -- initialize the lock table's lock type
194 * Notes: just copying. Should only be called once.
197 LockTypeInit(LOCKTAB *ltable,
204 ltable->ctl->nLockTypes = ntypes;
206 for (i = 0; i < ntypes; i++, prioP++, conflictsP++)
208 ltable->ctl->conflictTab[i] = *conflictsP;
209 ltable->ctl->prio[i] = *prioP;
214 * LockTabInit -- initialize a lock table structure
217 * (a) a lock table has four separate entries in the binding
218 * table. This is because every shared hash table and spinlock
219 * has its name stored in the binding table at its creation. It
220 * is wasteful, in this case, but not much space is involved.
224 LockTabInit(char *tabName,
236 if (ntypes > MAX_LOCKTYPES)
238 elog(NOTICE, "LockTabInit: too many lock types %d greater than %d",
239 ntypes, MAX_LOCKTYPES);
240 return (INVALID_TABLEID);
243 if (NumTables > MAX_TABLES)
246 "LockTabInit: system limit of MAX_TABLES (%d) lock tables",
248 return (INVALID_TABLEID);
251 /* allocate a string for the binding table lookup */
252 shmemName = (char *) palloc((unsigned) (strlen(tabName) + 32));
255 elog(NOTICE, "LockTabInit: couldn't malloc string %s \n", tabName);
256 return (INVALID_TABLEID);
259 /* each lock table has a non-shared header */
260 ltable = (LOCKTAB *) palloc((unsigned) sizeof(LOCKTAB));
263 elog(NOTICE, "LockTabInit: couldn't malloc lock table %s\n", tabName);
265 return (INVALID_TABLEID);
268 /* ------------------------
269 * find/acquire the spinlock for the table
270 * ------------------------
272 SpinAcquire(LockMgrLock);
275 /* -----------------------
276 * allocate a control structure from shared memory or attach to it
277 * if it already exists.
278 * -----------------------
280 sprintf(shmemName, "%s (ctl)", tabName);
281 ltable->ctl = (LOCKCTL *)
282 ShmemInitStruct(shmemName, (unsigned) sizeof(LOCKCTL), &found);
286 elog(FATAL, "LockTabInit: couldn't initialize %s", tabName);
291 * we're first - initialize
296 MemSet(ltable->ctl, 0, sizeof(LOCKCTL));
297 ltable->ctl->masterLock = LockMgrLock;
298 ltable->ctl->tableId = NumTables;
301 /* --------------------
302 * other modules refer to the lock table by a tableId
303 * --------------------
305 AllTables[NumTables] = ltable;
307 Assert(NumTables <= MAX_TABLES);
309 /* ----------------------
310 * allocate a hash table for the lock tags. This is used
311 * to find the different locks.
312 * ----------------------
314 info.keysize = sizeof(LOCKTAG);
315 info.datasize = sizeof(LOCK);
316 info.hash = tag_hash;
317 hash_flags = (HASH_ELEM | HASH_FUNCTION);
319 sprintf(shmemName, "%s (lock hash)", tabName);
320 ltable->lockHash = (HTAB *) ShmemInitHash(shmemName,
321 INIT_TABLE_SIZE, MAX_TABLE_SIZE,
324 Assert(ltable->lockHash->hash == tag_hash);
325 if (!ltable->lockHash)
327 elog(FATAL, "LockTabInit: couldn't initialize %s", tabName);
331 /* -------------------------
332 * allocate an xid table. When different transactions hold
333 * the same lock, additional information must be saved (locks per tx).
334 * -------------------------
336 info.keysize = XID_TAGSIZE;
337 info.datasize = sizeof(XIDLookupEnt);
338 info.hash = tag_hash;
339 hash_flags = (HASH_ELEM | HASH_FUNCTION);
341 sprintf(shmemName, "%s (xid hash)", tabName);
342 ltable->xidHash = (HTAB *) ShmemInitHash(shmemName,
343 INIT_TABLE_SIZE, MAX_TABLE_SIZE,
346 if (!ltable->xidHash)
348 elog(FATAL, "LockTabInit: couldn't initialize %s", tabName);
352 /* init ctl data structures */
353 LockTypeInit(ltable, conflictsP, prioP, ntypes);
355 SpinRelease(LockMgrLock);
360 return (ltable->ctl->tableId);
362 return (INVALID_TABLEID);
366 * LockTabRename -- allocate another tableId to the same
369 * NOTES: Both the lock module and the lock chain (lchain.c)
370 * module use table id's to distinguish between different
371 * kinds of locks. Short term and long term locks look
372 * the same to the lock table, but are handled differently
373 * by the lock chain manager. This function allows the
374 * client to use different tableIds when acquiring/releasing
375 * short term and long term locks.
379 LockTabRename(LockTableId tableId)
381 LockTableId newTableId;
383 if (NumTables >= MAX_TABLES)
385 return (INVALID_TABLEID);
387 if (AllTables[tableId] == INVALID_TABLEID)
389 return (INVALID_TABLEID);
392 /* other modules refer to the lock table by a tableId */
393 newTableId = NumTables;
396 AllTables[newTableId] = AllTables[tableId];
403 * LockAcquire -- Check for lock conflicts, sleep if conflict found,
404 * set lock if/when no conflicts.
406 * Returns: TRUE if parameters are correct, FALSE otherwise.
408 * Side Effects: The lock is always acquired. No way to abort
409 * a lock acquisition other than aborting the transaction.
410 * Lock is recorded in the lkchain.
412 * Note on User Locks:
413 * User locks are handled totally on the application side as
414 * long term cooperative locks which extend beyond the normal
415 * transaction boundaries. Their purpose is to indicate to an
416 * application that someone is `working' on an item. So it is
417 * possible to put an user lock on a tuple's oid, retrieve the
418 * tuple, work on it for an hour and then update it and remove
419 * the lock. While the lock is active other clients can still
420 * read and write the tuple but they can be aware that it has
421 * been locked at the application level by someone.
422 * User locks use lock tags made of an uint16 and an uint32, for
423 * example 0 and a tuple oid, or any other arbitrary pair of
424 * numbers following a convention established by the application.
425 * In this sense tags don't refer to tuples or database entities.
426 * User locks and normal locks are completely orthogonal and
427 * they don't interfere with each other, so it is possible
428 * to acquire a normal lock on an user-locked tuple or user-lock
429 * a tuple for which a normal write lock already exists.
430 * User locks are always non blocking, therefore they are never
431 * acquired if already held by another process. They must be
432 * released explicitly by the application but they are released
433 * automatically when a backend terminates.
434 * They are indicated by a dummy tableId 0 which doesn't have
435 * any table allocated but uses the normal lock table, and are
436 * distinguished from normal locks for the following differences:
438 * normal lock user lock
441 * tag.relId rel oid 0
442 * tag.ItemPointerData.ip_blkid block id lock id2
443 * tag.ItemPointerData.ip_posid tuple offset lock id1
444 * xid.pid 0 backend pid
445 * xid.xid current xid 0
446 * persistence transaction user or backend
448 * The lockt parameter can have the same values for normal locks
449 * although probably only WRITE_LOCK can have some practical use.
456 LockAcquire(LockTableId tableId, LOCKTAG *lockName, LOCKT lockt)
458 XIDLookupEnt *result,
471 is_user_lock = (tableId == 0);
475 #ifdef USER_LOCKS_DEBUG
476 elog(NOTICE, "LockAcquire: user lock tag [%u,%u] %d",
477 lockName->tupleId.ip_posid,
478 ((lockName->tupleId.ip_blkid.bi_hi << 16) +
479 lockName->tupleId.ip_blkid.bi_lo),
485 Assert(tableId < NumTables);
486 ltable = AllTables[tableId];
489 elog(NOTICE, "LockAcquire: bad lock table %d", tableId);
493 if (LockingIsDisabled)
498 LOCK_PRINT("Acquire", lockName, lockt);
499 masterLock = ltable->ctl->masterLock;
501 SpinAcquire(masterLock);
503 Assert(ltable->lockHash->hash == tag_hash);
504 lock = (LOCK *) hash_search(ltable->lockHash, (Pointer) lockName, HASH_ENTER, &found);
508 SpinRelease(masterLock);
509 elog(FATAL, "LockAcquire: lock table %d is corrupted", tableId);
513 /* --------------------
514 * if there was nothing else there, complete initialization
515 * --------------------
520 ProcQueueInit(&(lock->waitProcs));
521 MemSet((char *) lock->holders, 0, sizeof(int) * MAX_LOCKTYPES);
522 MemSet((char *) lock->activeHolders, 0, sizeof(int) * MAX_LOCKTYPES);
526 Assert(BlockIdEquals(&(lock->tag.tupleId.ip_blkid),
527 &(lockName->tupleId.ip_blkid)));
531 /* ------------------
532 * add an element to the lock queue so that we can clear the
533 * locks at end of transaction.
536 xidTable = ltable->xidHash;
537 myXid = GetCurrentTransactionId();
539 /* ------------------
540 * Zero out all of the tag bytes (this clears the padding bytes for long
541 * word alignment and ensures hashing consistency).
544 MemSet(&item, 0, XID_TAGSIZE); /* must clear padding, needed */
545 TransactionIdStore(myXid, &item.tag.xid);
546 item.tag.lock = MAKE_OFFSET(lock);
548 item.tag.pid = MyPid;
554 item.tag.pid = MyProcPid;
555 item.tag.xid = myXid = 0;
556 #ifdef USER_LOCKS_DEBUG
557 elog(NOTICE, "LockAcquire: user lock xid [%d,%d,%d]",
558 item.tag.lock, item.tag.pid, item.tag.xid);
563 result = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &item, HASH_ENTER, &found);
566 elog(NOTICE, "LockAcquire: xid table corrupted");
567 return (STATUS_ERROR);
571 XID_PRINT("LockAcquire: queueing XidEnt", result);
572 ProcAddLock(&result->queue);
573 result->nHolding = 0;
574 MemSet((char *) result->holders, 0, sizeof(int) * MAX_LOCKTYPES);
578 * lock->nholding tells us how many processes have _tried_ to
579 * acquire this lock, Regardless of whether they succeeded or
580 * failed in doing so.
584 lock->holders[lockt]++;
586 /* --------------------
587 * If I'm the only one holding a lock, then there
588 * cannot be a conflict. Need to subtract one from the
589 * lock's count since we just bumped the count up by 1
591 * --------------------
593 if (result->nHolding == lock->nActive)
595 result->holders[lockt]++;
597 GrantLock(lock, lockt);
598 SpinRelease(masterLock);
602 Assert(result->nHolding <= lock->nActive);
604 status = LockResolveConflicts(ltable, lock, lockt, myXid);
606 if (status == STATUS_OK)
608 GrantLock(lock, lockt);
610 else if (status == STATUS_FOUND)
615 * User locks are non blocking. If we can't acquire a lock remove
616 * the xid entry and return FALSE without waiting.
620 if (!result->nHolding)
622 SHMQueueDelete(&result->queue);
623 hash_search(xidTable, (Pointer) &item, HASH_REMOVE, &found);
626 lock->holders[lockt]--;
627 SpinRelease(masterLock);
628 #ifdef USER_LOCKS_DEBUG
629 elog(NOTICE, "LockAcquire: user lock failed");
634 status = WaitOnLock(ltable, tableId, lock, lockt);
635 XID_PRINT("Someone granted me the lock", result);
638 SpinRelease(masterLock);
640 return (status == STATUS_OK);
643 /* ----------------------------
644 * LockResolveConflicts -- test for lock conflicts
647 * Here's what makes this complicated: one transaction's
648 * locks don't conflict with one another. When many processes
649 * hold locks, each has to subtract off the other's locks when
650 * determining whether or not any new lock acquired conflicts with
653 * For example, if I am already holding a WRITE_INTENT lock,
654 * there will not be a conflict with my own READ_LOCK. If I
655 * don't consider the intent lock when checking for conflicts,
656 * I find no conflict.
657 * ----------------------------
660 LockResolveConflicts(LOCKTAB *ltable,
665 XIDLookupEnt *result,
675 nLockTypes = ltable->ctl->nLockTypes;
676 xidTable = ltable->xidHash;
678 /* ---------------------
679 * read my own statistics from the xid table. If there
680 * isn't an entry, then we'll just add one.
682 * Zero out the tag, this clears the padding bytes for long
683 * word alignment and ensures hashing consistency.
686 MemSet(&item, 0, XID_TAGSIZE);
687 TransactionIdStore(xid, &item.tag.xid);
688 item.tag.lock = MAKE_OFFSET(lock);
693 if (!(result = (XIDLookupEnt *)
694 hash_search(xidTable, (Pointer) &item, HASH_ENTER, &found)))
696 elog(NOTICE, "LockResolveConflicts: xid table corrupted");
697 return (STATUS_ERROR);
699 myHolders = result->holders;
704 * we're not holding any type of lock yet. Clear
708 MemSet(result->holders, 0, nLockTypes * sizeof(*(lock->holders)));
709 result->nHolding = 0;
713 /* ------------------------
714 * If someone with a greater priority is waiting for the lock,
715 * do not continue and share the lock, even if we can. bjm
716 * ------------------------
718 int myprio = ltable->ctl->prio[lockt];
719 PROC_QUEUE *waitQueue = &(lock->waitProcs);
720 PROC *topproc = (PROC *) MAKE_PTR(waitQueue->links.prev);
722 if (waitQueue->size && topproc->prio > myprio)
726 /* ----------------------------
727 * first check for global conflicts: If no locks conflict
728 * with mine, then I get the lock.
730 * Checking for conflict: lock->mask represents the types of
731 * currently held locks. conflictTable[lockt] has a bit
732 * set for each type of lock that conflicts with mine. Bitwise
733 * compare tells if there is a conflict.
734 * ----------------------------
736 if (!(ltable->ctl->conflictTab[lockt] & lock->mask))
739 result->holders[lockt]++;
742 XID_PRINT("Conflict Resolved: updated xid entry stats", result);
747 /* ------------------------
748 * Rats. Something conflicts. But it could still be my own
749 * lock. We have to construct a conflict mask
750 * that does not reflect our own locks.
751 * ------------------------
755 for (i = 1; i <= nLockTypes; i++, tmpMask <<= 1)
757 if (lock->activeHolders[i] - myHolders[i])
763 /* ------------------------
764 * now check again for conflicts. 'bitmask' describes the types
765 * of locks held by other processes. If one of these
766 * conflicts with the kind of lock that I want, there is a
767 * conflict and I have to sleep.
768 * ------------------------
770 if (!(ltable->ctl->conflictTab[lockt] & bitmask))
773 /* no conflict. Get the lock and go on */
775 result->holders[lockt]++;
778 XID_PRINT("Conflict Resolved: updated xid entry stats", result);
784 return (STATUS_FOUND);
788 WaitOnLock(LOCKTAB *ltable, LockTableId tableId, LOCK *lock, LOCKT lockt)
790 PROC_QUEUE *waitQueue = &(lock->waitProcs);
792 int prio = ltable->ctl->prio[lockt];
795 * the waitqueue is ordered by priority. I insert myself according to
796 * the priority of the lock I am acquiring.
798 * SYNC NOTE: I am assuming that the lock table spinlock is sufficient
799 * synchronization for this queue. That will not be true if/when
800 * people can be deleted from the queue by a SIGINT or something.
802 LOCK_DUMP_AUX("WaitOnLock: sleeping on lock", lock, lockt);
803 if (ProcSleep(waitQueue,
804 ltable->ctl->masterLock,
809 /* -------------------
810 * This could have happend as a result of a deadlock, see HandleDeadLock()
811 * Decrement the lock nHolding and holders fields as we are no longer
812 * waiting on this lock.
813 * -------------------
816 lock->holders[lockt]--;
817 LOCK_DUMP_AUX("WaitOnLock: aborting on lock", lock, lockt);
818 SpinRelease(ltable->ctl->masterLock);
819 elog(ERROR, "WaitOnLock: error on wakeup - Aborting this transaction");
822 LOCK_DUMP_AUX("WaitOnLock: wakeup on lock", lock, lockt);
827 * LockRelease -- look up 'lockName' in lock table 'tableId' and
830 * Side Effects: if the lock no longer conflicts with the highest
831 * priority waiting process, that process is granted the lock
832 * and awoken. (We have to grant the lock here to avoid a
833 * race between the waking process and any new process to
834 * come along and request the lock).
837 LockRelease(LockTableId tableId, LOCKTAG *lockName, LOCKT lockt)
843 XIDLookupEnt *result,
846 bool wakeupNeeded = true;
851 is_user_lock = (tableId == 0);
855 #ifdef USER_LOCKS_DEBUG
856 elog(NOTICE, "LockRelease: user lock tag [%u,%u] %d",
857 lockName->tupleId.ip_posid,
858 ((lockName->tupleId.ip_blkid.bi_hi << 16) +
859 lockName->tupleId.ip_blkid.bi_lo),
865 Assert(tableId < NumTables);
866 ltable = AllTables[tableId];
869 elog(NOTICE, "ltable is null in LockRelease");
873 if (LockingIsDisabled)
878 LOCK_PRINT("Release", lockName, lockt);
880 masterLock = ltable->ctl->masterLock;
881 xidTable = ltable->xidHash;
883 SpinAcquire(masterLock);
885 Assert(ltable->lockHash->hash == tag_hash);
887 hash_search(ltable->lockHash, (Pointer) lockName, HASH_FIND_SAVE, &found);
892 * If the entry is not found hash_search returns TRUE instead of NULL,
893 * so we must check it explicitly.
895 if ((is_user_lock) && (lock == (LOCK *) TRUE))
897 SpinRelease(masterLock);
898 elog(NOTICE, "LockRelease: there are no locks with this tag");
904 * let the caller print its own error message, too. Do not elog(ERROR).
908 SpinRelease(masterLock);
909 elog(NOTICE, "LockRelease: locktable corrupted");
915 SpinRelease(masterLock);
916 elog(NOTICE, "LockRelease: locktable lookup failed, no lock");
920 Assert(lock->nHolding > 0);
925 * If this is an user lock it can be removed only after checking that
926 * it was acquired by the current process, so this code is skipped and
934 * fix the general lock stats
937 lock->holders[lockt]--;
939 lock->activeHolders[lockt]--;
941 Assert(lock->nActive >= 0);
945 /* ------------------
946 * if there's no one waiting in the queue,
947 * we just released the last lock.
948 * Delete it from the lock table.
951 Assert(ltable->lockHash->hash == tag_hash);
952 lock = (LOCK *) hash_search(ltable->lockHash,
953 (Pointer) &(lock->tag),
956 Assert(lock && found);
957 wakeupNeeded = false;
963 /* ------------------
964 * Zero out all of the tag bytes (this clears the padding bytes for long
965 * word alignment and ensures hashing consistency).
968 MemSet(&item, 0, XID_TAGSIZE);
970 TransactionIdStore(GetCurrentTransactionId(), &item.tag.xid);
971 item.tag.lock = MAKE_OFFSET(lock);
973 item.tag.pid = MyPid;
979 item.tag.pid = MyProcPid;
981 #ifdef USER_LOCKS_DEBUG
982 elog(NOTICE, "LockRelease: user lock xid [%d,%d,%d]",
983 item.tag.lock, item.tag.pid, item.tag.xid);
988 if (!(result = (XIDLookupEnt *) hash_search(xidTable,
994 SpinRelease(masterLock);
996 if ((is_user_lock) && (result))
998 elog(NOTICE, "LockRelease: you don't have a lock on this tag");
1002 elog(NOTICE, "LockRelease: find xid, table corrupted");
1005 elog(NOTICE, "LockReplace: xid table corrupted");
1011 * now check to see if I have any private locks. If I do, decrement
1012 * the counts associated with them.
1014 result->holders[lockt]--;
1017 XID_PRINT("LockRelease updated xid stats", result);
1020 * If this was my last hold on this lock, delete my entry in the XID
1023 if (!result->nHolding)
1026 if (result->queue.prev == INVALID_OFFSET)
1028 elog(NOTICE, "LockRelease: xid.prev == INVALID_OFFSET");
1030 if (result->queue.next == INVALID_OFFSET)
1032 elog(NOTICE, "LockRelease: xid.next == INVALID_OFFSET");
1035 if (result->queue.next != INVALID_OFFSET)
1036 SHMQueueDelete(&result->queue);
1037 if (!(result = (XIDLookupEnt *)
1038 hash_search(xidTable, (Pointer) &item, HASH_REMOVE_SAVED, &found)) ||
1041 SpinRelease(masterLock);
1043 elog(NOTICE, "LockRelease: remove xid, table corrupted");
1045 elog(NOTICE, "LockReplace: xid table corrupted");
1054 * If this is an user lock remove it now, after the corresponding xid
1055 * entry has been found and deleted.
1061 * fix the general lock stats
1064 lock->holders[lockt]--;
1066 lock->activeHolders[lockt]--;
1068 Assert(lock->nActive >= 0);
1070 if (!lock->nHolding)
1072 /* ------------------
1073 * if there's no one waiting in the queue,
1074 * we just released the last lock.
1075 * Delete it from the lock table.
1076 * ------------------
1078 Assert(ltable->lockHash->hash == tag_hash);
1079 lock = (LOCK *) hash_search(ltable->lockHash,
1080 (Pointer) &(lock->tag),
1083 Assert(lock && found);
1084 wakeupNeeded = false;
1089 /* --------------------------
1090 * If there are still active locks of the type I just released, no one
1091 * should be woken up. Whoever is asleep will still conflict
1092 * with the remaining locks.
1093 * --------------------------
1095 if (!(lock->activeHolders[lockt]))
1097 /* change the conflict mask. No more of this lock type. */
1098 lock->mask &= BITS_OFF[lockt];
1103 /* --------------------------
1104 * Wake the first waiting process and grant him the lock if it
1105 * doesn't conflict. The woken process must record the lock
1107 * --------------------------
1109 ProcLockWakeup(&(lock->waitProcs), (char *) ltable, (char *) lock);
1112 SpinRelease(masterLock);
1117 * GrantLock -- udpate the lock data structure to show
1118 * the new lock holder.
1121 GrantLock(LOCK *lock, LOCKT lockt)
1124 lock->activeHolders[lockt]++;
1125 lock->mask |= BITS_ON[lockt];
1130 * LockReleaseAll -- Release all locks in a process lock queue.
1132 * Note: This code is a little complicated by the presence in the
1133 * same queue of user locks which can't be removed from the
1134 * normal lock queue at the end of a transaction. They must
1135 * however be removed when the backend exits.
1136 * A dummy tableId 0 is used to indicate that we are releasing
1137 * the user locks, from the code added to ProcKill().
1141 LockReleaseAll(LockTableId tableId, SHM_QUEUE *lockQueue)
1143 PROC_QUEUE *waitQueue;
1145 XIDLookupEnt *xidLook = NULL;
1146 XIDLookupEnt *tmp = NULL;
1147 SHMEM_OFFSET end = MAKE_OFFSET(lockQueue);
1148 SPINLOCK masterLock;
1156 int is_user_lock_table,
1160 is_user_lock_table = (tableId == 0);
1161 #ifdef USER_LOCKS_DEBUG
1162 elog(NOTICE, "LockReleaseAll: tableId=%d, pid=%d", tableId, MyProcPid);
1164 if (is_user_lock_table)
1170 Assert(tableId < NumTables);
1171 ltable = AllTables[tableId];
1175 nLockTypes = ltable->ctl->nLockTypes;
1176 masterLock = ltable->ctl->masterLock;
1178 if (SHMQueueEmpty(lockQueue))
1182 SpinAcquire(masterLock);
1184 SHMQueueFirst(lockQueue, (Pointer *) &xidLook, &xidLook->queue);
1186 XID_PRINT("LockReleaseAll", xidLook);
1189 SpinAcquire(masterLock);
1195 /* ---------------------------
1196 * XXX Here we assume the shared memory queue is circular and
1197 * that we know its internal structure. Should have some sort of
1198 * macros to allow one to walk it. mer 20 July 1991
1199 * ---------------------------
1201 done = (xidLook->queue.next == end);
1202 lock = (LOCK *) MAKE_PTR(xidLook->tag.lock);
1204 LOCK_PRINT("ReleaseAll", (&lock->tag), 0);
1209 * Sometimes the queue appears to be messed up.
1213 elog(NOTICE, "LockReleaseAll: xid loop detected, giving up");
1217 if (is_user_lock_table)
1219 if ((xidLook->tag.pid == 0) || (xidLook->tag.xid != 0))
1221 #ifdef USER_LOCKS_DEBUG
1222 elog(NOTICE, "LockReleaseAll: skip normal lock [%d,%d,%d]",
1223 xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
1228 if (xidLook->tag.pid != MyProcPid)
1230 /* This should never happen */
1231 #ifdef USER_LOCKS_DEBUG
1233 "LockReleaseAll: skip other pid [%u,%u] [%d,%d,%d]",
1234 lock->tag.tupleId.ip_posid,
1235 ((lock->tag.tupleId.ip_blkid.bi_hi << 16) +
1236 lock->tag.tupleId.ip_blkid.bi_lo),
1237 xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
1242 #ifdef USER_LOCKS_DEBUG
1244 "LockReleaseAll: release user lock [%u,%u] [%d,%d,%d]",
1245 lock->tag.tupleId.ip_posid,
1246 ((lock->tag.tupleId.ip_blkid.bi_hi << 16) +
1247 lock->tag.tupleId.ip_blkid.bi_lo),
1248 xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
1253 if ((xidLook->tag.pid != 0) || (xidLook->tag.xid == 0))
1255 #ifdef USER_LOCKS_DEBUG
1257 "LockReleaseAll: skip user lock [%u,%u] [%d,%d,%d]",
1258 lock->tag.tupleId.ip_posid,
1259 ((lock->tag.tupleId.ip_blkid.bi_hi << 16) +
1260 lock->tag.tupleId.ip_blkid.bi_lo),
1261 xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
1266 #ifdef USER_LOCKS_DEBUG
1267 elog(NOTICE, "LockReleaseAll: release normal lock [%d,%d,%d]",
1268 xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
1273 /* ------------------
1274 * fix the general lock stats
1275 * ------------------
1277 if (lock->nHolding != xidLook->nHolding)
1279 lock->nHolding -= xidLook->nHolding;
1280 lock->nActive -= xidLook->nHolding;
1281 Assert(lock->nActive >= 0);
1282 for (i = 1; i <= nLockTypes; i++)
1284 lock->holders[i] -= xidLook->holders[i];
1285 lock->activeHolders[i] -= xidLook->holders[i];
1286 if (!lock->activeHolders[i])
1287 lock->mask &= BITS_OFF[i];
1293 * set nHolding to zero so that we can garbage collect the lock
1300 * always remove the xidLookup entry, we're done with it now
1304 SHMQueueDelete(&xidLook->queue);
1306 if ((!hash_search(ltable->xidHash, (Pointer) xidLook, HASH_REMOVE, &found))
1309 SpinRelease(masterLock);
1311 elog(NOTICE, "LockReleaseAll: xid table corrupted");
1313 elog(NOTICE, "LockReplace: xid table corrupted");
1318 if (!lock->nHolding)
1320 /* --------------------
1321 * if there's no one waiting in the queue, we've just released
1323 * --------------------
1326 Assert(ltable->lockHash->hash == tag_hash);
1328 hash_search(ltable->lockHash, (Pointer) &(lock->tag), HASH_REMOVE, &found);
1329 if ((!lock) || (!found))
1331 SpinRelease(masterLock);
1333 elog(NOTICE, "LockReleaseAll: cannot remove lock from HTAB");
1335 elog(NOTICE, "LockReplace: cannot remove lock from HTAB");
1342 /* --------------------
1343 * Wake the first waiting process and grant him the lock if it
1344 * doesn't conflict. The woken process must record the lock
1346 * --------------------
1348 waitQueue = &(lock->waitProcs);
1349 ProcLockWakeup(waitQueue, (char *) ltable, (char *) lock);
1357 SHMQueueFirst(&xidLook->queue, (Pointer *) &tmp, &tmp->queue);
1360 SpinRelease(masterLock);
1364 * Reinitialize the queue only if nothing has been left in.
1368 SHMQueueInit(lockQueue);
1381 nLockBuckets = 1 << (int) my_log2((NLOCKENTS - 1) / DEF_FFACTOR + 1);
1382 nLockSegs = 1 << (int) my_log2((nLockBuckets - 1) / DEF_SEGSIZE + 1);
1384 nXidBuckets = 1 << (int) my_log2((NLOCKS_PER_XACT - 1) / DEF_FFACTOR + 1);
1385 nXidSegs = 1 << (int) my_log2((nLockBuckets - 1) / DEF_SEGSIZE + 1);
1387 size += MAXALIGN(NBACKENDS * sizeof(PROC)); /* each MyProc */
1388 size += MAXALIGN(NBACKENDS * sizeof(LOCKCTL)); /* each ltable->ctl */
1389 size += MAXALIGN(sizeof(PROC_HDR)); /* ProcGlobal */
1391 size += MAXALIGN(my_log2(NLOCKENTS) * sizeof(void *));
1392 size += MAXALIGN(sizeof(HHDR));
1393 size += nLockSegs * MAXALIGN(DEF_SEGSIZE * sizeof(SEGMENT));
1394 size += NLOCKENTS * /* XXX not multiple of BUCKET_ALLOC_INCR? */
1395 (MAXALIGN(sizeof(BUCKET_INDEX)) +
1396 MAXALIGN(sizeof(LOCK))); /* contains hash key */
1398 size += MAXALIGN(my_log2(NBACKENDS) * sizeof(void *));
1399 size += MAXALIGN(sizeof(HHDR));
1400 size += nXidSegs * MAXALIGN(DEF_SEGSIZE * sizeof(SEGMENT));
1401 size += NBACKENDS * /* XXX not multiple of BUCKET_ALLOC_INCR? */
1402 (MAXALIGN(sizeof(BUCKET_INDEX)) +
1403 MAXALIGN(sizeof(XIDLookupEnt))); /* contains hash key */
1408 /* -----------------
1409 * Boolean function to determine current locking status
1415 return LockingIsDisabled;
1418 #ifdef DEADLOCK_DEBUG
1420 * Dump all locks. Must have already acquired the masterLock.
1425 SHMEM_OFFSET location;
1427 SHM_QUEUE *lockQueue;
1429 XIDLookupEnt *xidLook = NULL;
1430 XIDLookupEnt *tmp = NULL;
1432 SPINLOCK masterLock;
1439 ShmemPIDLookup(MyProcPid, &location);
1440 if (location == INVALID_OFFSET)
1442 proc = (PROC *) MAKE_PTR(location);
1445 lockQueue = &proc->lockQueue;
1447 Assert(tableId < NumTables);
1448 ltable = AllTables[tableId];
1452 nLockTypes = ltable->ctl->nLockTypes;
1453 masterLock = ltable->ctl->masterLock;
1455 if (SHMQueueEmpty(lockQueue))
1458 SHMQueueFirst(lockQueue, (Pointer *) &xidLook, &xidLook->queue);
1459 end = MAKE_OFFSET(lockQueue);
1461 LOCK_DUMP("DumpLocks", MyProc->waitLock, 0);
1462 XID_PRINT("DumpLocks", xidLook);
1466 /* ---------------------------
1467 * XXX Here we assume the shared memory queue is circular and
1468 * that we know its internal structure. Should have some sort of
1469 * macros to allow one to walk it. mer 20 July 1991
1470 * ---------------------------
1472 done = (xidLook->queue.next == end);
1473 lock = (LOCK *) MAKE_PTR(xidLook->tag.lock);
1475 LOCK_DUMP("DumpLocks", lock, 0);
1479 elog(NOTICE, "DumpLocks: xid loop detected, giving up");
1485 SHMQueueFirst(&xidLook->queue, (Pointer *) &tmp, &tmp->queue);