1 /*-------------------------------------------------------------------------
4 * simple lock acquisition
6 * Copyright (c) 1994, Regents of the University of California
10 * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.3 1996/10/11 03:22:56 scrappy Exp $
13 * Outside modules can create a lock table and acquire/release
14 * locks. A lock table is a shared memory hash table. When
15 * a process tries to acquire a lock of a type that conflicts
16 * with existing locks, it is put to sleep using the routines
17 * in storage/lmgr/proc.c.
21 * LockAcquire(), LockRelease(), LockTabInit().
23 * LockReplace() is called only within this module and by the
24 * lkchain module. It releases a lock without looking
25 * the lock up in the lock table.
27 * NOTE: This module is used to define new lock tables. The
28 * multi-level lock table (multi.c) used by the heap
29 * access methods calls these routines. See multi.c for
30 * examples showing how to use this interface.
32 *-------------------------------------------------------------------------
34 #include <stdio.h> /* for sprintf() */
35 #include "storage/shmem.h"
36 #include "storage/spin.h"
37 #include "storage/proc.h"
38 #include "storage/lock.h"
39 #include "utils/hsearch.h"
40 #include "utils/elog.h"
41 #include "utils/palloc.h"
42 #include "access/xact.h"
44 /*#define LOCK_MGR_DEBUG*/
46 #ifndef LOCK_MGR_DEBUG
48 #define LOCK_PRINT(where,tag,type)
49 #define LOCK_DUMP(where,lock,type)
50 #define XID_PRINT(where,xidentP)
52 #else /* LOCK_MGR_DEBUG */
54 #define LOCK_PRINT(where,tag,type)\
55 elog(NOTICE, "%s: rel (%d) dbid (%d) tid (%d,%d) type (%d)\n",where, \
56 tag->relId, tag->dbId, \
57 ( (tag->tupleId.ip_blkid.data[0] >= 0) ? \
58 BlockIdGetBlockNumber(&tag->tupleId.ip_blkid) : -1 ), \
59 tag->tupleId.ip_posid, \
62 #define LOCK_DUMP(where,lock,type)\
63 elog(NOTICE, "%s: rel (%d) dbid (%d) tid (%d,%d) nHolding (%d) holders (%d,%d,%d,%d,%d) type (%d)\n",where, \
64 lock->tag.relId, lock->tag.dbId, \
65 ((lock->tag.tupleId.ip_blkid.data[0] >= 0) ? \
66 BlockIdGetBlockNumber(&lock->tag.tupleId.ip_blkid) : -1 ), \
67 lock->tag.tupleId.ip_posid, \
76 #define XID_PRINT(where,xidentP)\
78 "%s:xid (%d) pid (%d) lock (%x) nHolding (%d) holders (%d,%d,%d,%d,%d)",\
90 #endif /* LOCK_MGR_DEBUG */
92 SPINLOCK LockMgrLock; /* in Shmem or created in CreateSpinlocks() */
94 /* This is to simplify/speed up some bit arithmetic */
96 static MASK BITS_OFF[MAX_LOCKTYPES];
97 static MASK BITS_ON[MAX_LOCKTYPES];
100 * XXX Want to move this to this file
103 static bool LockingIsDisabled;
105 /* -------------------
106 * map from tableId to the lock table structure
107 * -------------------
109 static LOCKTAB *AllTables[MAX_TABLES];
111 /* -------------------
113 * -------------------
115 static int NumTables = 1;
117 /* -------------------
118 * InitLocks -- Init the lock module. Create a private data
119 * structure for constructing conflict masks.
120 * -------------------
129 /* -------------------
130 * remember 0th locktype is invalid
131 * -------------------
133 for (i=0;i<MAX_LOCKTYPES;i++,bit <<= 1)
140 /* -------------------
141 * LockDisable -- sets LockingIsDisabled flag to TRUE or FALSE.
145 LockDisable(int status)
147 LockingIsDisabled = status;
152 * LockTypeInit -- initialize the lock table's lock type
155 * Notes: just copying. Should only be called once.
158 LockTypeInit(LOCKTAB *ltable,
165 ltable->ctl->nLockTypes = ntypes;
167 for (i=0;i<ntypes;i++,prioP++,conflictsP++)
169 ltable->ctl->conflictTab[i] = *conflictsP;
170 ltable->ctl->prio[i] = *prioP;
175 * LockTabInit -- initialize a lock table structure
178 * (a) a lock table has four separate entries in the binding
179 * table. This is because every shared hash table and spinlock
180 * has its name stored in the binding table at its creation. It
181 * is wasteful, in this case, but not much space is involved.
185 LockTabInit(char *tabName,
197 if (ntypes > MAX_LOCKTYPES)
199 elog(NOTICE,"LockTabInit: too many lock types %d greater than %d",
200 ntypes,MAX_LOCKTYPES);
201 return(INVALID_TABLEID);
204 if (NumTables > MAX_TABLES)
207 "LockTabInit: system limit of MAX_TABLES (%d) lock tables",
209 return(INVALID_TABLEID);
212 /* allocate a string for the binding table lookup */
213 shmemName = (char *) palloc((unsigned)(strlen(tabName)+32));
216 elog(NOTICE,"LockTabInit: couldn't malloc string %s \n",tabName);
217 return(INVALID_TABLEID);
220 /* each lock table has a non-shared header */
221 ltable = (LOCKTAB *) palloc((unsigned) sizeof(LOCKTAB));
224 elog(NOTICE,"LockTabInit: couldn't malloc lock table %s\n",tabName);
225 (void) pfree (shmemName);
226 return(INVALID_TABLEID);
229 /* ------------------------
230 * find/acquire the spinlock for the table
231 * ------------------------
233 SpinAcquire(LockMgrLock);
236 /* -----------------------
237 * allocate a control structure from shared memory or attach to it
238 * if it already exists.
239 * -----------------------
241 sprintf(shmemName,"%s (ctl)",tabName);
242 ltable->ctl = (LOCKCTL *)
243 ShmemInitStruct(shmemName,(unsigned)sizeof(LOCKCTL),&found);
247 elog(FATAL,"LockTabInit: couldn't initialize %s",tabName);
252 * we're first - initialize
257 memset(ltable->ctl, 0, sizeof(LOCKCTL));
258 ltable->ctl->masterLock = LockMgrLock;
259 ltable->ctl->tableId = NumTables;
262 /* --------------------
263 * other modules refer to the lock table by a tableId
264 * --------------------
266 AllTables[NumTables] = ltable;
268 Assert(NumTables <= MAX_TABLES);
270 /* ----------------------
271 * allocate a hash table for the lock tags. This is used
272 * to find the different locks.
273 * ----------------------
275 info.keysize = sizeof(LOCKTAG);
276 info.datasize = sizeof(LOCK);
277 info.hash = tag_hash;
278 hash_flags = (HASH_ELEM | HASH_FUNCTION);
280 sprintf(shmemName,"%s (lock hash)",tabName);
281 ltable->lockHash = (HTAB *) ShmemInitHash(shmemName,
282 INIT_TABLE_SIZE,MAX_TABLE_SIZE,
285 Assert( ltable->lockHash->hash == tag_hash);
286 if (! ltable->lockHash)
288 elog(FATAL,"LockTabInit: couldn't initialize %s",tabName);
292 /* -------------------------
293 * allocate an xid table. When different transactions hold
294 * the same lock, additional information must be saved (locks per tx).
295 * -------------------------
297 info.keysize = XID_TAGSIZE;
298 info.datasize = sizeof(XIDLookupEnt);
299 info.hash = tag_hash;
300 hash_flags = (HASH_ELEM | HASH_FUNCTION);
302 sprintf(shmemName,"%s (xid hash)",tabName);
303 ltable->xidHash = (HTAB *) ShmemInitHash(shmemName,
304 INIT_TABLE_SIZE,MAX_TABLE_SIZE,
307 if (! ltable->xidHash)
309 elog(FATAL,"LockTabInit: couldn't initialize %s",tabName);
313 /* init ctl data structures */
314 LockTypeInit(ltable, conflictsP, prioP, ntypes);
316 SpinRelease(LockMgrLock);
318 (void) pfree (shmemName);
321 return(ltable->ctl->tableId);
323 return(INVALID_TABLEID);
327 * LockTabRename -- allocate another tableId to the same
330 * NOTES: Both the lock module and the lock chain (lchain.c)
331 * module use table id's to distinguish between different
332 * kinds of locks. Short term and long term locks look
333 * the same to the lock table, but are handled differently
334 * by the lock chain manager. This function allows the
335 * client to use different tableIds when acquiring/releasing
336 * short term and long term locks.
339 LockTabRename(LockTableId tableId)
341 LockTableId newTableId;
343 if (NumTables >= MAX_TABLES)
345 return(INVALID_TABLEID);
347 if (AllTables[tableId] == INVALID_TABLEID)
349 return(INVALID_TABLEID);
352 /* other modules refer to the lock table by a tableId */
353 newTableId = NumTables;
356 AllTables[newTableId] = AllTables[tableId];
361 * LockAcquire -- Check for lock conflicts, sleep if conflict found,
362 * set lock if/when no conflicts.
364 * Returns: TRUE if parameters are correct, FALSE otherwise.
366 * Side Effects: The lock is always acquired. No way to abort
367 * a lock acquisition other than aborting the transaction.
368 * Lock is recorded in the lkchain.
370 * Note on User Locks:
371 * User locks are handled totally on the application side as
372 * long term cooperative locks which extend beyond the normal
373 * transaction boundaries. Their purpose is to indicate to an
374 * application that someone is `working' on an item. So it is
375 * possible to put an user lock on a tuple's oid, retrieve the
376 * tuple, work on it for an hour and then update it and remove
377 * the lock. While the lock is active other clients can still
378 * read and write the tuple but they can be aware that it has
379 * been locked at the application level by someone.
380 * User locks use lock tags made of an uint16 and an uint32, for
381 * example 0 and a tuple oid, or any other arbitrary pair of
382 * numbers following a convention established by the application.
383 * In this sense tags don't refer to tuples or database entities.
384 * User locks and normal locks are completely orthogonal and
385 * they don't interfere with each other, so it is possible
386 * to acquire a normal lock on an user-locked tuple or user-lock
387 * a tuple for which a normal write lock already exists.
388 * User locks are always non blocking, therefore they are never
389 * acquired if already held by another process. They must be
390 * released explicitly by the application but they are released
391 * automatically when a backend terminates.
392 * They are indicated by a dummy tableId 0 which doesn't have
393 * any table allocated but uses the normal lock table, and are
394 * distinguished from normal locks for the following differences:
396 * normal lock user lock
399 * tag.relId rel oid 0
400 * tag.ItemPointerData.ip_blkid block id lock id2
401 * tag.ItemPointerData.ip_posid tuple offset lock id1
402 * xid.pid 0 backend pid
403 * xid.xid current xid 0
404 * persistence transaction user or backend
406 * The lockt parameter can have the same values for normal locks
407 * although probably only WRITE_LOCK can have some practical use.
413 LockAcquire(LockTableId tableId, LOCKTAG *lockName, LOCKT lockt)
415 XIDLookupEnt *result,item;
427 is_user_lock = (tableId == 0);
430 #ifdef USER_LOCKS_DEBUG
431 elog(NOTICE,"LockAcquire: user lock tag [%u,%u] %d",
432 lockName->tupleId.ip_posid,
433 ((lockName->tupleId.ip_blkid.bi_hi<<16)+
434 lockName->tupleId.ip_blkid.bi_lo),
440 Assert (tableId < NumTables);
441 ltable = AllTables[tableId];
444 elog(NOTICE,"LockAcquire: bad lock table %d",tableId);
448 if (LockingIsDisabled)
453 LOCK_PRINT("Acquire",lockName,lockt);
454 masterLock = ltable->ctl->masterLock;
456 SpinAcquire(masterLock);
458 Assert( ltable->lockHash->hash == tag_hash);
459 lock = (LOCK *)hash_search(ltable->lockHash,(Pointer)lockName,HASH_ENTER,&found);
463 SpinRelease(masterLock);
464 elog(FATAL,"LockAcquire: lock table %d is corrupted",tableId);
468 /* --------------------
469 * if there was nothing else there, complete initialization
470 * --------------------
475 ProcQueueInit(&(lock->waitProcs));
476 memset((char *)lock->holders, 0, sizeof(int)*MAX_LOCKTYPES);
477 memset((char *)lock->activeHolders, 0, sizeof(int)*MAX_LOCKTYPES);
481 Assert(BlockIdEquals(&(lock->tag.tupleId.ip_blkid),
482 &(lockName->tupleId.ip_blkid)));
486 /* ------------------
487 * add an element to the lock queue so that we can clear the
488 * locks at end of transaction.
491 xidTable = ltable->xidHash;
492 myXid = GetCurrentTransactionId();
494 /* ------------------
495 * Zero out all of the tag bytes (this clears the padding bytes for long
496 * word alignment and ensures hashing consistency).
499 memset(&item, 0, XID_TAGSIZE);
500 TransactionIdStore(myXid, &item.tag.xid);
501 item.tag.lock = MAKE_OFFSET(lock);
503 item.tag.pid = MyPid;
508 item.tag.pid = getpid();
509 item.tag.xid = myXid = 0;
510 #ifdef USER_LOCKS_DEBUG
511 elog(NOTICE,"LockAcquire: user lock xid [%d,%d,%d]",
512 item.tag.lock, item.tag.pid, item.tag.xid);
517 result = (XIDLookupEnt *)hash_search(xidTable, (Pointer)&item, HASH_ENTER, &found);
520 elog(NOTICE,"LockAcquire: xid table corrupted");
521 return(STATUS_ERROR);
525 XID_PRINT("queueing XidEnt LockAcquire:", result);
526 ProcAddLock(&result->queue);
527 result->nHolding = 0;
528 memset((char *)result->holders, 0, sizeof(int)*MAX_LOCKTYPES);
532 * lock->nholding tells us how many processes have _tried_ to
533 * acquire this lock, Regardless of whether they succeeded or
534 * failed in doing so.
538 lock->holders[lockt]++;
540 /* --------------------
541 * If I'm the only one holding a lock, then there
542 * cannot be a conflict. Need to subtract one from the
543 * lock's count since we just bumped the count up by 1
545 * --------------------
547 if (result->nHolding == lock->nActive)
549 result->holders[lockt]++;
551 GrantLock(lock, lockt);
552 SpinRelease(masterLock);
556 Assert(result->nHolding <= lock->nActive);
558 status = LockResolveConflicts(ltable, lock, lockt, myXid);
560 if (status == STATUS_OK)
562 GrantLock(lock, lockt);
564 else if (status == STATUS_FOUND)
568 * User locks are non blocking. If we can't acquire a lock
569 * remove the xid entry and return FALSE without waiting.
572 if (!result->nHolding) {
573 SHMQueueDelete(&result->queue);
574 hash_search(xidTable, (Pointer)&item, HASH_REMOVE, &found);
577 lock->holders[lockt]--;
578 SpinRelease(masterLock);
579 #ifdef USER_LOCKS_DEBUG
580 elog(NOTICE,"LockAcquire: user lock failed");
585 status = WaitOnLock(ltable, tableId, lock, lockt);
586 XID_PRINT("Someone granted me the lock", result);
589 SpinRelease(masterLock);
591 return(status == STATUS_OK);
594 /* ----------------------------
595 * LockResolveConflicts -- test for lock conflicts
598 * Here's what makes this complicated: one transaction's
599 * locks don't conflict with one another. When many processes
600 * hold locks, each has to subtract off the other's locks when
601 * determining whether or not any new lock acquired conflicts with
604 * For example, if I am already holding a WRITE_INTENT lock,
605 * there will not be a conflict with my own READ_LOCK. If I
606 * don't consider the intent lock when checking for conflicts,
607 * I find no conflict.
608 * ----------------------------
611 LockResolveConflicts(LOCKTAB *ltable,
616 XIDLookupEnt *result,item;
624 nLockTypes = ltable->ctl->nLockTypes;
625 xidTable = ltable->xidHash;
627 /* ---------------------
628 * read my own statistics from the xid table. If there
629 * isn't an entry, then we'll just add one.
631 * Zero out the tag, this clears the padding bytes for long
632 * word alignment and ensures hashing consistency.
635 memset(&item, 0, XID_TAGSIZE);
636 TransactionIdStore(xid, &item.tag.xid);
637 item.tag.lock = MAKE_OFFSET(lock);
642 if (! (result = (XIDLookupEnt *)
643 hash_search(xidTable, (Pointer)&item, HASH_ENTER, &found)))
645 elog(NOTICE,"LockResolveConflicts: xid table corrupted");
646 return(STATUS_ERROR);
648 myHolders = result->holders;
653 * we're not holding any type of lock yet. Clear
657 memset(result->holders, 0, nLockTypes * sizeof(*(lock->holders)));
658 result->nHolding = 0;
661 /* ----------------------------
662 * first check for global conflicts: If no locks conflict
663 * with mine, then I get the lock.
665 * Checking for conflict: lock->mask represents the types of
666 * currently held locks. conflictTable[lockt] has a bit
667 * set for each type of lock that conflicts with mine. Bitwise
668 * compare tells if there is a conflict.
669 * ----------------------------
671 if (! (ltable->ctl->conflictTab[lockt] & lock->mask))
674 result->holders[lockt]++;
677 XID_PRINT("Conflict Resolved: updated xid entry stats", result);
682 /* ------------------------
683 * Rats. Something conflicts. But it could still be my own
684 * lock. We have to construct a conflict mask
685 * that does not reflect our own locks.
686 * ------------------------
690 for (i=1;i<=nLockTypes;i++, tmpMask <<= 1)
692 if (lock->activeHolders[i] - myHolders[i])
698 /* ------------------------
699 * now check again for conflicts. 'bitmask' describes the types
700 * of locks held by other processes. If one of these
701 * conflicts with the kind of lock that I want, there is a
702 * conflict and I have to sleep.
703 * ------------------------
705 if (! (ltable->ctl->conflictTab[lockt] & bitmask))
708 /* no conflict. Get the lock and go on */
710 result->holders[lockt]++;
713 XID_PRINT("Conflict Resolved: updated xid entry stats", result);
719 return(STATUS_FOUND);
723 WaitOnLock(LOCKTAB *ltable, LockTableId tableId, LOCK *lock, LOCKT lockt)
725 PROC_QUEUE *waitQueue = &(lock->waitProcs);
727 int prio = ltable->ctl->prio[lockt];
729 /* the waitqueue is ordered by priority. I insert myself
730 * according to the priority of the lock I am acquiring.
732 * SYNC NOTE: I am assuming that the lock table spinlock
733 * is sufficient synchronization for this queue. That
734 * will not be true if/when people can be deleted from
735 * the queue by a SIGINT or something.
737 LOCK_DUMP("WaitOnLock: sleeping on lock", lock, lockt);
738 if (ProcSleep(waitQueue,
739 ltable->ctl->masterLock,
744 /* -------------------
745 * This could have happend as a result of a deadlock, see HandleDeadLock()
746 * Decrement the lock nHolding and holders fields as we are no longer
747 * waiting on this lock.
748 * -------------------
751 lock->holders[lockt]--;
752 LOCK_DUMP("WaitOnLock: aborting on lock", lock, lockt);
753 SpinRelease(ltable->ctl->masterLock);
754 elog(WARN,"WaitOnLock: error on wakeup - Aborting this transaction");
761 * LockRelease -- look up 'lockName' in lock table 'tableId' and
764 * Side Effects: if the lock no longer conflicts with the highest
765 * priority waiting process, that process is granted the lock
766 * and awoken. (We have to grant the lock here to avoid a
767 * race between the waking process and any new process to
768 * come along and request the lock).
771 LockRelease(LockTableId tableId, LOCKTAG *lockName, LOCKT lockt)
777 XIDLookupEnt *result,item;
779 bool wakeupNeeded = true;
784 is_user_lock = (tableId == 0);
787 #ifdef USER_LOCKS_DEBUG
788 elog(NOTICE,"LockRelease: user lock tag [%u,%u] %d",
789 lockName->tupleId.ip_posid,
790 ((lockName->tupleId.ip_blkid.bi_hi<<16)+
791 lockName->tupleId.ip_blkid.bi_lo),
797 Assert (tableId < NumTables);
798 ltable = AllTables[tableId];
800 elog(NOTICE, "ltable is null in LockRelease");
804 if (LockingIsDisabled)
809 LOCK_PRINT("Release",lockName,lockt);
811 masterLock = ltable->ctl->masterLock;
812 xidTable = ltable->xidHash;
814 SpinAcquire(masterLock);
816 Assert( ltable->lockHash->hash == tag_hash);
818 hash_search(ltable->lockHash,(Pointer)lockName,HASH_FIND_SAVE,&found);
822 * If the entry is not found hash_search returns TRUE
823 * instead of NULL, so we must check it explicitly.
825 if ((is_user_lock) && (lock == (LOCK *)TRUE)) {
826 SpinRelease(masterLock);
827 elog(NOTICE,"LockRelease: there are no locks with this tag");
832 /* let the caller print its own error message, too.
837 SpinRelease(masterLock);
838 elog(NOTICE,"LockRelease: locktable corrupted");
844 SpinRelease(masterLock);
845 elog(NOTICE,"LockRelease: locktable lookup failed, no lock");
849 Assert(lock->nHolding > 0);
853 * If this is an user lock it can be removed only after
854 * checking that it was acquired by the current process,
855 * so this code is skipped and executed later.
860 * fix the general lock stats
863 lock->holders[lockt]--;
865 lock->activeHolders[lockt]--;
867 Assert(lock->nActive >= 0);
869 if (! lock->nHolding)
871 /* ------------------
872 * if there's no one waiting in the queue,
873 * we just released the last lock.
874 * Delete it from the lock table.
877 Assert( ltable->lockHash->hash == tag_hash);
878 lock = (LOCK *) hash_search(ltable->lockHash,
879 (Pointer) &(lock->tag),
882 Assert(lock && found);
883 wakeupNeeded = false;
889 /* ------------------
890 * Zero out all of the tag bytes (this clears the padding bytes for long
891 * word alignment and ensures hashing consistency).
894 memset(&item, 0, XID_TAGSIZE);
896 TransactionIdStore(GetCurrentTransactionId(), &item.tag.xid);
897 item.tag.lock = MAKE_OFFSET(lock);
899 item.tag.pid = MyPid;
904 item.tag.pid = getpid();
906 #ifdef USER_LOCKS_DEBUG
907 elog(NOTICE,"LockRelease: user lock xid [%d,%d,%d]",
908 item.tag.lock, item.tag.pid, item.tag.xid);
913 if (! ( result = (XIDLookupEnt *) hash_search(xidTable,
919 SpinRelease(masterLock);
921 if ((is_user_lock) && (result)) {
922 elog(NOTICE,"LockRelease: you don't have a lock on this tag");
924 elog(NOTICE,"LockRelease: find xid, table corrupted");
927 elog(NOTICE,"LockReplace: xid table corrupted");
932 * now check to see if I have any private locks. If I do,
933 * decrement the counts associated with them.
935 result->holders[lockt]--;
938 XID_PRINT("LockRelease updated xid stats", result);
941 * If this was my last hold on this lock, delete my entry
944 if (! result->nHolding)
947 if (result->queue.prev == INVALID_OFFSET) {
948 elog(NOTICE,"LockRelease: xid.prev == INVALID_OFFSET");
950 if (result->queue.next == INVALID_OFFSET) {
951 elog(NOTICE,"LockRelease: xid.next == INVALID_OFFSET");
954 if (result->queue.next != INVALID_OFFSET)
955 SHMQueueDelete(&result->queue);
956 if (! (result = (XIDLookupEnt *)
957 hash_search(xidTable, (Pointer)&item, HASH_REMOVE_SAVED, &found)) ||
960 SpinRelease(masterLock);
962 elog(NOTICE,"LockRelease: remove xid, table corrupted");
964 elog(NOTICE,"LockReplace: xid table corrupted");
972 * If this is an user lock remove it now, after the
973 * corresponding xid entry has been found and deleted.
977 * fix the general lock stats
980 lock->holders[lockt]--;
982 lock->activeHolders[lockt]--;
984 Assert(lock->nActive >= 0);
986 if (! lock->nHolding)
988 /* ------------------
989 * if there's no one waiting in the queue,
990 * we just released the last lock.
991 * Delete it from the lock table.
994 Assert( ltable->lockHash->hash == tag_hash);
995 lock = (LOCK *) hash_search(ltable->lockHash,
996 (Pointer) &(lock->tag),
999 Assert(lock && found);
1000 wakeupNeeded = false;
1005 /* --------------------------
1006 * If there are still active locks of the type I just released, no one
1007 * should be woken up. Whoever is asleep will still conflict
1008 * with the remaining locks.
1009 * --------------------------
1011 if (! (lock->activeHolders[lockt]))
1013 /* change the conflict mask. No more of this lock type. */
1014 lock->mask &= BITS_OFF[lockt];
1019 /* --------------------------
1020 * Wake the first waiting process and grant him the lock if it
1021 * doesn't conflict. The woken process must record the lock
1023 * --------------------------
1025 (void) ProcLockWakeup(&(lock->waitProcs), (char *) ltable, (char *) lock);
1028 SpinRelease(masterLock);
1033 * GrantLock -- udpate the lock data structure to show
1034 * the new lock holder.
1037 GrantLock(LOCK *lock, LOCKT lockt)
1040 lock->activeHolders[lockt]++;
1041 lock->mask |= BITS_ON[lockt];
1046 * LockReleaseAll -- Release all locks in a process lock queue.
1048 * Note: This code is a little complicated by the presence in the
1049 * same queue of user locks which can't be removed from the
1050 * normal lock queue at the end of a transaction. They must
1051 * however be removed when the backend exits.
1052 * A dummy tableId 0 is used to indicate that we are releasing
1053 * the user locks, from the code added to ProcKill().
1057 LockReleaseAll(LockTableId tableId, SHM_QUEUE *lockQueue)
1059 PROC_QUEUE *waitQueue;
1061 XIDLookupEnt *xidLook = NULL;
1062 XIDLookupEnt *tmp = NULL;
1063 SHMEM_OFFSET end = MAKE_OFFSET(lockQueue);
1064 SPINLOCK masterLock;
1071 int is_user_lock_table, my_pid, count, nskip;
1073 is_user_lock_table = (tableId == 0);
1075 #ifdef USER_LOCKS_DEBUG
1076 elog(NOTICE,"LockReleaseAll: tableId=%d, pid=%d", tableId, my_pid);
1078 if (is_user_lock_table) {
1083 Assert (tableId < NumTables);
1084 ltable = AllTables[tableId];
1088 nLockTypes = ltable->ctl->nLockTypes;
1089 masterLock = ltable->ctl->masterLock;
1091 if (SHMQueueEmpty(lockQueue))
1095 SpinAcquire(masterLock);
1097 SHMQueueFirst(lockQueue,(Pointer*)&xidLook,&xidLook->queue);
1099 XID_PRINT("LockReleaseAll:", xidLook);
1102 SpinAcquire(masterLock);
1108 /* ---------------------------
1109 * XXX Here we assume the shared memory queue is circular and
1110 * that we know its internal structure. Should have some sort of
1111 * macros to allow one to walk it. mer 20 July 1991
1112 * ---------------------------
1114 done = (xidLook->queue.next == end);
1115 lock = (LOCK *) MAKE_PTR(xidLook->tag.lock);
1117 LOCK_PRINT("ReleaseAll",(&lock->tag),0);
1121 * Sometimes the queue appears to be messed up.
1123 if (count++ > 2000) {
1124 elog(NOTICE,"LockReleaseAll: xid loop detected, giving up");
1128 if (is_user_lock_table) {
1129 if ((xidLook->tag.pid == 0) || (xidLook->tag.xid != 0)) {
1130 #ifdef USER_LOCKS_DEBUG
1131 elog(NOTICE,"LockReleaseAll: skip normal lock [%d,%d,%d]",
1132 xidLook->tag.lock,xidLook->tag.pid,xidLook->tag.xid);
1137 if (xidLook->tag.pid != my_pid) {
1138 /* This should never happen */
1139 #ifdef USER_LOCKS_DEBUG
1141 "LockReleaseAll: skip other pid [%u,%u] [%d,%d,%d]",
1142 lock->tag.tupleId.ip_posid,
1143 ((lock->tag.tupleId.ip_blkid.bi_hi<<16)+
1144 lock->tag.tupleId.ip_blkid.bi_lo),
1145 xidLook->tag.lock,xidLook->tag.pid,xidLook->tag.xid);
1150 #ifdef USER_LOCKS_DEBUG
1152 "LockReleaseAll: release user lock [%u,%u] [%d,%d,%d]",
1153 lock->tag.tupleId.ip_posid,
1154 ((lock->tag.tupleId.ip_blkid.bi_hi<<16)+
1155 lock->tag.tupleId.ip_blkid.bi_lo),
1156 xidLook->tag.lock,xidLook->tag.pid,xidLook->tag.xid);
1159 if ((xidLook->tag.pid != 0) || (xidLook->tag.xid == 0)) {
1160 #ifdef USER_LOCKS_DEBUG
1162 "LockReleaseAll: skip user lock [%u,%u] [%d,%d,%d]",
1163 lock->tag.tupleId.ip_posid,
1164 ((lock->tag.tupleId.ip_blkid.bi_hi<<16)+
1165 lock->tag.tupleId.ip_blkid.bi_lo),
1166 xidLook->tag.lock,xidLook->tag.pid,xidLook->tag.xid);
1171 #ifdef USER_LOCKS_DEBUG
1172 elog(NOTICE,"LockReleaseAll: release normal lock [%d,%d,%d]",
1173 xidLook->tag.lock,xidLook->tag.pid,xidLook->tag.xid);
1178 /* ------------------
1179 * fix the general lock stats
1180 * ------------------
1182 if (lock->nHolding != xidLook->nHolding)
1184 lock->nHolding -= xidLook->nHolding;
1185 lock->nActive -= xidLook->nHolding;
1186 Assert(lock->nActive >= 0);
1187 for (i=1; i<=nLockTypes; i++)
1189 lock->holders[i] -= xidLook->holders[i];
1190 lock->activeHolders[i] -= xidLook->holders[i];
1191 if (! lock->activeHolders[i])
1192 lock->mask &= BITS_OFF[i];
1198 * set nHolding to zero so that we can garbage collect the lock
1205 * always remove the xidLookup entry, we're done with it now
1209 SHMQueueDelete(&xidLook->queue);
1211 if ((! hash_search(ltable->xidHash, (Pointer)xidLook, HASH_REMOVE, &found))
1214 SpinRelease(masterLock);
1216 elog(NOTICE,"LockReleaseAll: xid table corrupted");
1218 elog(NOTICE,"LockReplace: xid table corrupted");
1223 if (! lock->nHolding)
1225 /* --------------------
1226 * if there's no one waiting in the queue, we've just released
1228 * --------------------
1231 Assert( ltable->lockHash->hash == tag_hash);
1233 hash_search(ltable->lockHash,(Pointer)&(lock->tag),HASH_REMOVE, &found);
1234 if ((! lock) || (!found))
1236 SpinRelease(masterLock);
1238 elog(NOTICE,"LockReleaseAll: cannot remove lock from HTAB");
1240 elog(NOTICE,"LockReplace: cannot remove lock from HTAB");
1247 /* --------------------
1248 * Wake the first waiting process and grant him the lock if it
1249 * doesn't conflict. The woken process must record the lock
1251 * --------------------
1253 waitQueue = &(lock->waitProcs);
1254 (void) ProcLockWakeup(waitQueue, (char *) ltable, (char *) lock);
1262 SHMQueueFirst(&xidLook->queue,(Pointer*)&tmp,&tmp->queue);
1265 SpinRelease(masterLock);
1268 * Reinitialize the queue only if nothing has been left in.
1272 SHMQueueInit(lockQueue);
1280 int nLockBuckets, nLockSegs;
1281 int nXidBuckets, nXidSegs;
1283 nLockBuckets = 1 << (int)my_log2((NLOCKENTS - 1) / DEF_FFACTOR + 1);
1284 nLockSegs = 1 << (int)my_log2((nLockBuckets - 1) / DEF_SEGSIZE + 1);
1286 nXidBuckets = 1 << (int)my_log2((NLOCKS_PER_XACT-1) / DEF_FFACTOR + 1);
1287 nXidSegs = 1 << (int)my_log2((nLockBuckets - 1) / DEF_SEGSIZE + 1);
1289 size += MAXALIGN(NBACKENDS * sizeof(PROC)); /* each MyProc */
1290 size += MAXALIGN(NBACKENDS * sizeof(LOCKCTL)); /* each ltable->ctl */
1291 size += MAXALIGN(sizeof(PROC_HDR)); /* ProcGlobal */
1293 size += MAXALIGN(my_log2(NLOCKENTS) * sizeof(void *));
1294 size += MAXALIGN(sizeof(HHDR));
1295 size += nLockSegs * MAXALIGN(DEF_SEGSIZE * sizeof(SEGMENT));
1296 size += NLOCKENTS * /* XXX not multiple of BUCKET_ALLOC_INCR? */
1297 (MAXALIGN(sizeof(BUCKET_INDEX)) +
1298 MAXALIGN(sizeof(LOCK))); /* contains hash key */
1300 size += MAXALIGN(my_log2(NBACKENDS) * sizeof(void *));
1301 size += MAXALIGN(sizeof(HHDR));
1302 size += nXidSegs * MAXALIGN(DEF_SEGSIZE * sizeof(SEGMENT));
1303 size += NBACKENDS * /* XXX not multiple of BUCKET_ALLOC_INCR? */
1304 (MAXALIGN(sizeof(BUCKET_INDEX)) +
1305 MAXALIGN(sizeof(XIDLookupEnt))); /* contains hash key */
1310 /* -----------------
1311 * Boolean function to determine current locking status
1317 return LockingIsDisabled;