]> granicus.if.org Git - postgresql/blob - src/backend/storage/lmgr/lock.c
I have written some patches to the postgres lock manager which allow the
[postgresql] / src / backend / storage / lmgr / lock.c
1 /*-------------------------------------------------------------------------
2  *
3  * lock.c--
4  *    simple lock acquisition
5  *
6  * Copyright (c) 1994, Regents of the University of California
7  *
8  *
9  * IDENTIFICATION
10  *    $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.3 1996/10/11 03:22:56 scrappy Exp $
11  *
12  * NOTES
13  *    Outside modules can create a lock table and acquire/release
14  *    locks.  A lock table is a shared memory hash table.  When
15  *    a process tries to acquire a lock of a type that conflicts
16  *    with existing locks, it is put to sleep using the routines
17  *    in storage/lmgr/proc.c.
18  *
19  *  Interface:
20  *
21  *  LockAcquire(), LockRelease(), LockTabInit().
22  *
23  *  LockReplace() is called only within this module and by the
24  *      lkchain module.  It releases a lock without looking
25  *      the lock up in the lock table.
26  *
27  *  NOTE: This module is used to define new lock tables.  The
28  *      multi-level lock table (multi.c) used by the heap
29  *      access methods calls these routines.  See multi.c for
30  *      examples showing how to use this interface.
31  *
32  *-------------------------------------------------------------------------
33  */
34 #include <stdio.h>              /* for sprintf() */
35 #include "storage/shmem.h"
36 #include "storage/spin.h"
37 #include "storage/proc.h"
38 #include "storage/lock.h"
39 #include "utils/hsearch.h"
40 #include "utils/elog.h"
41 #include "utils/palloc.h"
42 #include "access/xact.h"
43
44 /*#define LOCK_MGR_DEBUG*/
45
46 #ifndef LOCK_MGR_DEBUG
47
48 #define LOCK_PRINT(where,tag,type)
49 #define LOCK_DUMP(where,lock,type)
50 #define XID_PRINT(where,xidentP)
51
52 #else /* LOCK_MGR_DEBUG */
53
54 #define LOCK_PRINT(where,tag,type)\
55   elog(NOTICE, "%s: rel (%d) dbid (%d) tid (%d,%d) type (%d)\n",where, \
56          tag->relId, tag->dbId, \
57          ( (tag->tupleId.ip_blkid.data[0] >= 0) ? \
58                 BlockIdGetBlockNumber(&tag->tupleId.ip_blkid) : -1 ), \
59          tag->tupleId.ip_posid, \
60          type);
61
62 #define LOCK_DUMP(where,lock,type)\
63   elog(NOTICE, "%s: rel (%d) dbid (%d) tid (%d,%d) nHolding (%d) holders (%d,%d,%d,%d,%d) type (%d)\n",where, \
64        lock->tag.relId, lock->tag.dbId, \
65        ((lock->tag.tupleId.ip_blkid.data[0] >= 0) ? \
66         BlockIdGetBlockNumber(&lock->tag.tupleId.ip_blkid) : -1 ), \
67        lock->tag.tupleId.ip_posid, \
68        lock->nHolding,\
69        lock->holders[1],\
70        lock->holders[2],\
71        lock->holders[3],\
72        lock->holders[4],\
73        lock->holders[5],\
74        type);
75
76 #define XID_PRINT(where,xidentP)\
77   elog(NOTICE,\
78        "%s:xid (%d) pid (%d) lock (%x) nHolding (%d) holders (%d,%d,%d,%d,%d)",\
79        where,\
80        xidentP->tag.xid,\
81        xidentP->tag.pid,\
82        xidentP->tag.lock,\
83        xidentP->nHolding,\
84        xidentP->holders[1],\
85        xidentP->holders[2],\
86        xidentP->holders[3],\
87        xidentP->holders[4],\
88        xidentP->holders[5]);
89
90 #endif /* LOCK_MGR_DEBUG */
91
92 SPINLOCK LockMgrLock;           /* in Shmem or created in CreateSpinlocks() */
93
94 /* This is to simplify/speed up some bit arithmetic */
95
96 static MASK     BITS_OFF[MAX_LOCKTYPES];
97 static MASK     BITS_ON[MAX_LOCKTYPES];
98
99 /* -----------------
100  * XXX Want to move this to this file
101  * -----------------
102  */
103 static bool LockingIsDisabled;
104
105 /* -------------------
106  * map from tableId to the lock table structure
107  * -------------------
108  */
109 static LOCKTAB *AllTables[MAX_TABLES];
110
111 /* -------------------
112  * no zero-th table
113  * -------------------
114  */
115 static int      NumTables = 1;
116
117 /* -------------------
118  * InitLocks -- Init the lock module.  Create a private data
119  *      structure for constructing conflict masks.
120  * -------------------
121  */
122 void
123 InitLocks()
124 {
125     int i;
126     int bit;
127     
128     bit = 1;
129     /* -------------------
130      * remember 0th locktype is invalid
131      * -------------------
132      */
133     for (i=0;i<MAX_LOCKTYPES;i++,bit <<= 1)
134         {
135             BITS_ON[i] = bit;
136             BITS_OFF[i] = ~bit;
137         }
138 }
139
140 /* -------------------
141  * LockDisable -- sets LockingIsDisabled flag to TRUE or FALSE.
142  * ------------------
143  */
144 void
145 LockDisable(int status)
146 {
147     LockingIsDisabled = status;
148 }
149
150
151 /*
152  * LockTypeInit -- initialize the lock table's lock type
153  *      structures
154  *
155  * Notes: just copying.  Should only be called once.
156  */
157 static void
158 LockTypeInit(LOCKTAB *ltable,
159              MASK *conflictsP,
160              int *prioP,
161              int ntypes)
162 {
163     int i;
164     
165     ltable->ctl->nLockTypes = ntypes;
166     ntypes++;
167     for (i=0;i<ntypes;i++,prioP++,conflictsP++)
168         {
169             ltable->ctl->conflictTab[i] = *conflictsP;
170             ltable->ctl->prio[i] = *prioP;
171         }
172 }
173
174 /*
175  * LockTabInit -- initialize a lock table structure
176  *
177  * Notes:
178  *      (a) a lock table has four separate entries in the binding
179  *      table.  This is because every shared hash table and spinlock
180  *      has its name stored in the binding table at its creation.  It
181  *      is wasteful, in this case, but not much space is involved.
182  *
183  */
184 LockTableId
185 LockTabInit(char *tabName,
186             MASK *conflictsP,
187             int *prioP,
188             int ntypes)
189 {
190     LOCKTAB *ltable;
191     char *shmemName;
192     HASHCTL info;
193     int hash_flags;
194     bool        found;
195     int status = TRUE;
196     
197     if (ntypes > MAX_LOCKTYPES)
198         {
199             elog(NOTICE,"LockTabInit: too many lock types %d greater than %d",
200                  ntypes,MAX_LOCKTYPES);
201             return(INVALID_TABLEID);
202         }
203     
204     if (NumTables > MAX_TABLES)
205         {
206             elog(NOTICE,
207                  "LockTabInit: system limit of MAX_TABLES (%d) lock tables",
208                  MAX_TABLES);
209             return(INVALID_TABLEID);
210         }
211     
212     /* allocate a string for the binding table lookup */
213     shmemName = (char *) palloc((unsigned)(strlen(tabName)+32));
214     if (! shmemName)
215         {
216             elog(NOTICE,"LockTabInit: couldn't malloc string %s \n",tabName);
217             return(INVALID_TABLEID);
218         }
219     
220     /* each lock table has a non-shared header */
221     ltable = (LOCKTAB *) palloc((unsigned) sizeof(LOCKTAB));
222     if (! ltable)
223         {
224             elog(NOTICE,"LockTabInit: couldn't malloc lock table %s\n",tabName);
225             (void) pfree (shmemName);
226             return(INVALID_TABLEID);
227         }
228     
229     /* ------------------------
230      * find/acquire the spinlock for the table
231      * ------------------------
232      */
233     SpinAcquire(LockMgrLock);
234     
235     
236     /* -----------------------
237      * allocate a control structure from shared memory or attach to it
238      * if it already exists.
239      * -----------------------
240      */
241     sprintf(shmemName,"%s (ctl)",tabName);
242     ltable->ctl = (LOCKCTL *)
243         ShmemInitStruct(shmemName,(unsigned)sizeof(LOCKCTL),&found);
244     
245     if (! ltable->ctl)
246         {
247             elog(FATAL,"LockTabInit: couldn't initialize %s",tabName);
248             status = FALSE;
249         }
250     
251     /* ----------------
252      * we're first - initialize
253      * ----------------
254      */
255     if (! found)
256         {
257             memset(ltable->ctl, 0, sizeof(LOCKCTL)); 
258             ltable->ctl->masterLock = LockMgrLock;
259             ltable->ctl->tableId = NumTables;
260         }
261     
262     /* --------------------
263      * other modules refer to the lock table by a tableId
264      * --------------------
265      */
266     AllTables[NumTables] = ltable;
267     NumTables++;
268     Assert(NumTables <= MAX_TABLES);
269     
270     /* ----------------------
271      * allocate a hash table for the lock tags.  This is used
272      * to find the different locks.
273      * ----------------------
274      */
275     info.keysize =  sizeof(LOCKTAG);
276     info.datasize = sizeof(LOCK);
277     info.hash = tag_hash;
278     hash_flags = (HASH_ELEM | HASH_FUNCTION);
279     
280     sprintf(shmemName,"%s (lock hash)",tabName);
281     ltable->lockHash = (HTAB *) ShmemInitHash(shmemName,
282                                               INIT_TABLE_SIZE,MAX_TABLE_SIZE,
283                                               &info,hash_flags);
284     
285     Assert( ltable->lockHash->hash == tag_hash);
286     if (! ltable->lockHash)
287         {
288             elog(FATAL,"LockTabInit: couldn't initialize %s",tabName);
289             status = FALSE;
290         }
291     
292     /* -------------------------
293      * allocate an xid table.  When different transactions hold
294      * the same lock, additional information must be saved (locks per tx).
295      * -------------------------
296      */
297     info.keysize = XID_TAGSIZE;
298     info.datasize = sizeof(XIDLookupEnt);
299     info.hash = tag_hash;
300     hash_flags = (HASH_ELEM | HASH_FUNCTION);
301     
302     sprintf(shmemName,"%s (xid hash)",tabName);
303     ltable->xidHash = (HTAB *) ShmemInitHash(shmemName,
304                                              INIT_TABLE_SIZE,MAX_TABLE_SIZE,
305                                              &info,hash_flags);
306     
307     if (! ltable->xidHash)
308         {
309             elog(FATAL,"LockTabInit: couldn't initialize %s",tabName);
310             status = FALSE;
311         }
312     
313     /* init ctl data structures */
314     LockTypeInit(ltable, conflictsP, prioP, ntypes);
315     
316     SpinRelease(LockMgrLock);
317     
318     (void) pfree (shmemName);
319     
320     if (status)
321         return(ltable->ctl->tableId);
322     else
323         return(INVALID_TABLEID);
324 }
325
326 /*
327  * LockTabRename -- allocate another tableId to the same
328  *      lock table.
329  *
330  * NOTES: Both the lock module and the lock chain (lchain.c)
331  *      module use table id's to distinguish between different
332  *      kinds of locks.  Short term and long term locks look
333  *      the same to the lock table, but are handled differently
334  *      by the lock chain manager.  This function allows the
335  *      client to use different tableIds when acquiring/releasing
336  *      short term and long term locks.
337  */
338 LockTableId
339 LockTabRename(LockTableId tableId)
340 {
341     LockTableId newTableId;
342     
343     if (NumTables >= MAX_TABLES)
344         {
345             return(INVALID_TABLEID);
346         }
347     if (AllTables[tableId] == INVALID_TABLEID)
348         {
349             return(INVALID_TABLEID);
350         }
351     
352     /* other modules refer to the lock table by a tableId */
353     newTableId = NumTables;
354     NumTables++;
355     
356     AllTables[newTableId] = AllTables[tableId];
357     return(newTableId);
358 }
359
360 /*
361  * LockAcquire -- Check for lock conflicts, sleep if conflict found,
362  *      set lock if/when no conflicts.
363  *
364  * Returns: TRUE if parameters are correct, FALSE otherwise.
365  *
366  * Side Effects: The lock is always acquired.  No way to abort
367  *      a lock acquisition other than aborting the transaction.
368  *      Lock is recorded in the lkchain.
369 #ifdef USER_LOCKS
370  * Note on User Locks: 
371  *      User locks are handled totally on the application side as
372  *      long term cooperative locks which extend beyond the normal
373  *      transaction boundaries.  Their purpose is to indicate to an
374  *      application that someone is `working' on an item.  So it is
375  *      possible to put an user lock on a tuple's oid, retrieve the
376  *      tuple, work on it for an hour and then update it and remove
377  *      the lock.  While the lock is active other clients can still
378  *      read and write the tuple but they can be aware that it has
379  *      been locked at the application level by someone.
380  *      User locks use lock tags made of an uint16 and an uint32, for
381  *      example 0 and a tuple oid, or any other arbitrary pair of
382  *      numbers following a convention established by the application.
383  *      In this sense tags don't refer to tuples or database entities.
384  *      User locks and normal locks are completely orthogonal and
385  *      they don't interfere with each other, so it is possible
386  *      to acquire a normal lock on an user-locked tuple or user-lock
387  *      a tuple for which a normal write lock already exists.
388  *      User locks are always non blocking, therefore they are never
389  *      acquired if already held by another process.  They must be
390  *      released explicitly by the application but they are released
391  *      automatically when a backend terminates.
392  *      They are indicated by a dummy tableId 0 which doesn't have
393  *      any table allocated but uses the normal lock table, and are
394  *      distinguished from normal locks for the following differences:
395  *
396  *                                      normal lock     user lock
397  *
398  *      tableId                         1               0
399  *      tag.relId                       rel oid         0
400  *      tag.ItemPointerData.ip_blkid    block id        lock id2
401  *      tag.ItemPointerData.ip_posid    tuple offset    lock id1
402  *      xid.pid                         0               backend pid
403  *      xid.xid                         current xid     0
404  *      persistence                     transaction     user or backend
405  *
406  *      The lockt parameter can have the same values for normal locks
407  *      although probably only WRITE_LOCK can have some practical use.
408  *
409  *                                                      DZ - 4 Oct 1996
410 #endif
411  */
412 bool
413 LockAcquire(LockTableId tableId, LOCKTAG *lockName, LOCKT lockt)
414 {
415     XIDLookupEnt        *result,item;
416     HTAB                *xidTable;
417     bool        found;
418     LOCK                *lock = NULL;
419     SPINLOCK    masterLock;
420     LOCKTAB     *ltable;
421     int                 status;
422     TransactionId       myXid;
423     
424 #ifdef USER_LOCKS
425     int is_user_lock;
426
427     is_user_lock = (tableId == 0);
428     if (is_user_lock) {
429         tableId = 1;
430 #ifdef USER_LOCKS_DEBUG
431         elog(NOTICE,"LockAcquire: user lock tag [%u,%u] %d",
432              lockName->tupleId.ip_posid,
433              ((lockName->tupleId.ip_blkid.bi_hi<<16)+
434               lockName->tupleId.ip_blkid.bi_lo),
435              lockt);
436 #endif
437     }
438 #endif
439
440     Assert (tableId < NumTables);
441     ltable = AllTables[tableId];
442     if (!ltable)
443         {
444             elog(NOTICE,"LockAcquire: bad lock table %d",tableId);
445             return  (FALSE);
446         }
447     
448     if (LockingIsDisabled)
449         {
450             return(TRUE);
451         }
452     
453     LOCK_PRINT("Acquire",lockName,lockt);
454     masterLock = ltable->ctl->masterLock;
455     
456     SpinAcquire(masterLock);
457     
458     Assert( ltable->lockHash->hash == tag_hash);
459     lock = (LOCK *)hash_search(ltable->lockHash,(Pointer)lockName,HASH_ENTER,&found);
460     
461     if (! lock)
462         {
463             SpinRelease(masterLock);
464             elog(FATAL,"LockAcquire: lock table %d is corrupted",tableId);
465             return(FALSE);
466         }
467     
468     /* --------------------
469      * if there was nothing else there, complete initialization
470      * --------------------
471      */
472     if  (! found)
473         {
474             lock->mask = 0;
475             ProcQueueInit(&(lock->waitProcs));
476             memset((char *)lock->holders, 0, sizeof(int)*MAX_LOCKTYPES);
477             memset((char *)lock->activeHolders, 0, sizeof(int)*MAX_LOCKTYPES);
478             lock->nHolding = 0;
479             lock->nActive = 0;
480             
481             Assert(BlockIdEquals(&(lock->tag.tupleId.ip_blkid),
482                                  &(lockName->tupleId.ip_blkid)));
483             
484         }
485     
486     /* ------------------
487      * add an element to the lock queue so that we can clear the
488      * locks at end of transaction.
489      * ------------------
490      */
491     xidTable = ltable->xidHash;
492     myXid = GetCurrentTransactionId();
493     
494     /* ------------------
495      * Zero out all of the tag bytes (this clears the padding bytes for long
496      * word alignment and ensures hashing consistency).
497      * ------------------
498      */
499     memset(&item, 0, XID_TAGSIZE); 
500     TransactionIdStore(myXid, &item.tag.xid);
501     item.tag.lock = MAKE_OFFSET(lock);
502 #if 0
503     item.tag.pid = MyPid;
504 #endif
505     
506 #ifdef USER_LOCKS
507     if (is_user_lock) {
508         item.tag.pid = getpid();
509         item.tag.xid = myXid = 0;
510 #ifdef USER_LOCKS_DEBUG
511         elog(NOTICE,"LockAcquire: user lock xid [%d,%d,%d]",
512              item.tag.lock, item.tag.pid, item.tag.xid);
513 #endif
514     }
515 #endif
516
517     result = (XIDLookupEnt *)hash_search(xidTable, (Pointer)&item, HASH_ENTER, &found);
518     if (!result)
519         {
520             elog(NOTICE,"LockAcquire: xid table corrupted");
521             return(STATUS_ERROR);
522         }
523     if (!found)
524         {
525             XID_PRINT("queueing XidEnt LockAcquire:", result);
526             ProcAddLock(&result->queue);
527             result->nHolding = 0;
528             memset((char *)result->holders, 0, sizeof(int)*MAX_LOCKTYPES);
529         }
530     
531     /* ----------------
532      * lock->nholding tells us how many processes have _tried_ to
533      * acquire this lock,  Regardless of whether they succeeded or
534      * failed in doing so.
535      * ----------------
536      */
537     lock->nHolding++;
538     lock->holders[lockt]++;
539     
540     /* --------------------
541      * If I'm the only one holding a lock, then there
542      * cannot be a conflict.  Need to subtract one from the
543      * lock's count since we just bumped the count up by 1 
544      * above.
545      * --------------------
546      */
547     if (result->nHolding == lock->nActive)
548         {
549             result->holders[lockt]++;
550             result->nHolding++;
551             GrantLock(lock, lockt);
552             SpinRelease(masterLock);
553             return(TRUE);
554         }
555     
556     Assert(result->nHolding <= lock->nActive);
557     
558     status = LockResolveConflicts(ltable, lock, lockt, myXid);
559     
560     if (status == STATUS_OK)
561         {
562             GrantLock(lock, lockt);
563         }
564     else if (status == STATUS_FOUND)
565         {
566 #ifdef USER_LOCKS
567             /*
568              * User locks are non blocking. If we can't acquire a lock
569              * remove the xid entry and return FALSE without waiting.
570              */
571             if (is_user_lock) {
572                 if (!result->nHolding) {
573                     SHMQueueDelete(&result->queue);
574                     hash_search(xidTable, (Pointer)&item, HASH_REMOVE, &found);
575                 }
576                 lock->nHolding--;
577                 lock->holders[lockt]--;
578                 SpinRelease(masterLock);
579 #ifdef USER_LOCKS_DEBUG
580                 elog(NOTICE,"LockAcquire: user lock failed");
581 #endif
582                 return(FALSE);
583             }
584 #endif
585             status = WaitOnLock(ltable, tableId, lock, lockt);
586             XID_PRINT("Someone granted me the lock", result);
587         }
588     
589     SpinRelease(masterLock);
590     
591     return(status == STATUS_OK);
592 }
593
594 /* ----------------------------
595  * LockResolveConflicts -- test for lock conflicts
596  *
597  * NOTES:
598  *      Here's what makes this complicated: one transaction's
599  * locks don't conflict with one another.  When many processes
600  * hold locks, each has to subtract off the other's locks when
601  * determining whether or not any new lock acquired conflicts with
602  * the old ones.
603  *
604  *  For example, if I am already holding a WRITE_INTENT lock,
605  *  there will not be a conflict with my own READ_LOCK.  If I
606  *  don't consider the intent lock when checking for conflicts,
607  *  I find no conflict.
608  * ----------------------------
609  */
610 int
611 LockResolveConflicts(LOCKTAB *ltable,
612                      LOCK *lock,
613                      LOCKT lockt,
614                      TransactionId xid)
615 {
616     XIDLookupEnt        *result,item;
617     int         *myHolders;
618     int         nLockTypes;
619     HTAB                *xidTable;
620     bool        found;
621     int         bitmask;
622     int                 i,tmpMask;
623     
624     nLockTypes = ltable->ctl->nLockTypes;
625     xidTable = ltable->xidHash;
626     
627     /* ---------------------
628      * read my own statistics from the xid table.  If there
629      * isn't an entry, then we'll just add one.
630      *
631      * Zero out the tag, this clears the padding bytes for long
632      * word alignment and ensures hashing consistency.
633      * ------------------
634      */
635     memset(&item, 0, XID_TAGSIZE);
636     TransactionIdStore(xid, &item.tag.xid);
637     item.tag.lock = MAKE_OFFSET(lock);
638 #if 0
639     item.tag.pid = pid;
640 #endif
641     
642     if (! (result = (XIDLookupEnt *)
643            hash_search(xidTable, (Pointer)&item, HASH_ENTER, &found)))
644         {
645             elog(NOTICE,"LockResolveConflicts: xid table corrupted");
646             return(STATUS_ERROR);
647         }
648     myHolders = result->holders;
649     
650     if (! found)
651         {
652             /* ---------------
653              * we're not holding any type of lock yet.  Clear
654              * the lock stats.
655              * ---------------
656              */
657             memset(result->holders, 0, nLockTypes * sizeof(*(lock->holders))); 
658             result->nHolding = 0;
659         }
660     
661     /* ----------------------------
662      * first check for global conflicts: If no locks conflict
663      * with mine, then I get the lock.
664      *
665      * Checking for conflict: lock->mask represents the types of
666      * currently held locks.  conflictTable[lockt] has a bit
667      * set for each type of lock that conflicts with mine.  Bitwise
668      * compare tells if there is a conflict.
669      * ----------------------------
670      */
671     if (! (ltable->ctl->conflictTab[lockt] & lock->mask))
672         {
673             
674             result->holders[lockt]++;
675             result->nHolding++;
676             
677             XID_PRINT("Conflict Resolved: updated xid entry stats", result);
678             
679             return(STATUS_OK);
680         }
681     
682     /* ------------------------
683      * Rats.  Something conflicts. But it could still be my own
684      * lock.  We have to construct a conflict mask
685      * that does not reflect our own locks.
686      * ------------------------
687      */
688     bitmask = 0;
689     tmpMask = 2;
690     for (i=1;i<=nLockTypes;i++, tmpMask <<= 1)
691         {
692             if (lock->activeHolders[i] - myHolders[i])
693                 {
694                     bitmask |= tmpMask;
695                 }
696         }
697     
698     /* ------------------------
699      * now check again for conflicts.  'bitmask' describes the types
700      * of locks held by other processes.  If one of these
701      * conflicts with the kind of lock that I want, there is a
702      * conflict and I have to sleep.
703      * ------------------------
704      */
705     if (! (ltable->ctl->conflictTab[lockt] & bitmask))
706         {
707             
708             /* no conflict. Get the lock and go on */
709             
710             result->holders[lockt]++;
711             result->nHolding++;
712             
713             XID_PRINT("Conflict Resolved: updated xid entry stats", result);
714             
715             return(STATUS_OK);
716             
717         }
718     
719     return(STATUS_FOUND);
720 }
721
722 int
723 WaitOnLock(LOCKTAB *ltable, LockTableId tableId, LOCK *lock, LOCKT lockt)
724 {
725     PROC_QUEUE *waitQueue = &(lock->waitProcs);
726     
727     int prio = ltable->ctl->prio[lockt];
728     
729     /* the waitqueue is ordered by priority. I insert myself
730      * according to the priority of the lock I am acquiring.
731      *
732      * SYNC NOTE: I am assuming that the lock table spinlock
733      * is sufficient synchronization for this queue.  That
734      * will not be true if/when people can be deleted from
735      * the queue by a SIGINT or something.
736      */
737     LOCK_DUMP("WaitOnLock: sleeping on lock", lock, lockt);
738     if (ProcSleep(waitQueue,
739                   ltable->ctl->masterLock,
740                   lockt,
741                   prio,
742                   lock) != NO_ERROR)
743         {
744             /* -------------------
745              * This could have happend as a result of a deadlock, see HandleDeadLock()
746              * Decrement the lock nHolding and holders fields as we are no longer 
747              * waiting on this lock.
748              * -------------------
749              */
750             lock->nHolding--;
751             lock->holders[lockt]--;
752             LOCK_DUMP("WaitOnLock: aborting on lock", lock, lockt);
753             SpinRelease(ltable->ctl->masterLock);
754             elog(WARN,"WaitOnLock: error on wakeup - Aborting this transaction");
755         }
756     
757     return(STATUS_OK);
758 }
759
760 /*
761  * LockRelease -- look up 'lockName' in lock table 'tableId' and
762  *      release it.
763  *
764  * Side Effects: if the lock no longer conflicts with the highest
765  *      priority waiting process, that process is granted the lock
766  *      and awoken. (We have to grant the lock here to avoid a
767  *      race between the waking process and any new process to
768  *      come along and request the lock).
769  */
770 bool
771 LockRelease(LockTableId tableId, LOCKTAG *lockName, LOCKT lockt)
772 {
773     LOCK                *lock = NULL;
774     SPINLOCK    masterLock;
775     bool        found;
776     LOCKTAB     *ltable;
777     XIDLookupEnt        *result,item;
778     HTAB                *xidTable;
779     bool                wakeupNeeded = true;
780     
781 #ifdef USER_LOCKS
782     int is_user_lock;
783
784     is_user_lock = (tableId == 0);
785     if (is_user_lock) {
786         tableId = 1;
787 #ifdef USER_LOCKS_DEBUG
788         elog(NOTICE,"LockRelease: user lock tag [%u,%u] %d",
789              lockName->tupleId.ip_posid,
790              ((lockName->tupleId.ip_blkid.bi_hi<<16)+
791               lockName->tupleId.ip_blkid.bi_lo),
792              lockt);
793 #endif
794     }
795 #endif
796
797     Assert (tableId < NumTables);
798     ltable = AllTables[tableId];
799     if (!ltable) {
800         elog(NOTICE, "ltable is null in LockRelease");
801         return (FALSE);
802     }
803     
804     if (LockingIsDisabled)
805         {
806             return(TRUE);
807         }
808     
809     LOCK_PRINT("Release",lockName,lockt);
810     
811     masterLock = ltable->ctl->masterLock;
812     xidTable = ltable->xidHash;
813     
814     SpinAcquire(masterLock);
815     
816     Assert( ltable->lockHash->hash == tag_hash);
817     lock = (LOCK *)
818         hash_search(ltable->lockHash,(Pointer)lockName,HASH_FIND_SAVE,&found);
819     
820 #ifdef USER_LOCKS
821     /*
822      * If the entry is not found hash_search returns TRUE
823      * instead of NULL, so we must check it explicitly.
824      */
825     if ((is_user_lock) && (lock == (LOCK *)TRUE)) {
826         SpinRelease(masterLock);
827         elog(NOTICE,"LockRelease: there are no locks with this tag");
828         return(FALSE);
829     }
830 #endif
831
832     /* let the caller print its own error message, too.
833      * Do not elog(WARN).
834      */
835     if (! lock)
836         {
837             SpinRelease(masterLock);
838             elog(NOTICE,"LockRelease: locktable corrupted");
839             return(FALSE);
840         }
841     
842     if (! found)
843         {
844             SpinRelease(masterLock);
845             elog(NOTICE,"LockRelease: locktable lookup failed, no lock");
846             return(FALSE);
847         }
848     
849     Assert(lock->nHolding > 0);
850     
851 #ifdef USER_LOCKS
852     /*
853      * If this is an user lock it can be removed only after
854      * checking that it was acquired by the current process,
855      * so this code is skipped and executed later.
856      */
857   if (!is_user_lock) {
858 #endif
859     /*
860      * fix the general lock stats
861      */
862     lock->nHolding--;
863     lock->holders[lockt]--;
864     lock->nActive--;
865     lock->activeHolders[lockt]--;
866     
867     Assert(lock->nActive >= 0);
868     
869     if (! lock->nHolding)
870         {
871             /* ------------------
872              * if there's no one waiting in the queue,
873              * we just released the last lock.
874              * Delete it from the lock table.
875              * ------------------
876              */
877             Assert( ltable->lockHash->hash == tag_hash);
878             lock = (LOCK *) hash_search(ltable->lockHash,
879                                         (Pointer) &(lock->tag),
880                                         HASH_REMOVE_SAVED,
881                                         &found);
882             Assert(lock && found);
883             wakeupNeeded = false;
884         }
885 #ifdef USER_LOCKS
886   }
887 #endif
888     
889     /* ------------------
890      * Zero out all of the tag bytes (this clears the padding bytes for long
891      * word alignment and ensures hashing consistency).
892      * ------------------
893      */
894     memset(&item, 0, XID_TAGSIZE);
895     
896     TransactionIdStore(GetCurrentTransactionId(), &item.tag.xid);
897     item.tag.lock = MAKE_OFFSET(lock);
898 #if 0
899     item.tag.pid = MyPid;
900 #endif
901     
902 #ifdef USER_LOCKS
903     if (is_user_lock) {
904         item.tag.pid = getpid();
905         item.tag.xid = 0;
906 #ifdef USER_LOCKS_DEBUG
907         elog(NOTICE,"LockRelease: user lock xid [%d,%d,%d]",
908              item.tag.lock, item.tag.pid, item.tag.xid);
909 #endif
910     }
911 #endif
912
913     if (! ( result = (XIDLookupEnt *) hash_search(xidTable,
914                                                   (Pointer)&item,
915                                                   HASH_FIND_SAVE,
916                                                   &found) )
917         || !found)
918         {
919             SpinRelease(masterLock);
920 #ifdef USER_LOCKS
921             if ((is_user_lock) && (result)) {
922                 elog(NOTICE,"LockRelease: you don't have a lock on this tag");
923             } else {
924                 elog(NOTICE,"LockRelease: find xid, table corrupted");
925             }
926 #else
927             elog(NOTICE,"LockReplace: xid table corrupted");
928 #endif
929             return(FALSE);
930         }
931     /*
932      * now check to see if I have any private locks.  If I do,
933      * decrement the counts associated with them.
934      */
935     result->holders[lockt]--;
936     result->nHolding--;
937     
938     XID_PRINT("LockRelease updated xid stats", result);
939     
940     /*
941      * If this was my last hold on this lock, delete my entry
942      * in the XID table.
943      */
944     if (! result->nHolding)
945         {
946 #ifdef USER_LOCKS
947             if (result->queue.prev == INVALID_OFFSET) {
948                 elog(NOTICE,"LockRelease: xid.prev == INVALID_OFFSET");
949             }
950             if (result->queue.next == INVALID_OFFSET) {
951                 elog(NOTICE,"LockRelease: xid.next == INVALID_OFFSET");
952             }
953 #endif
954             if (result->queue.next != INVALID_OFFSET)
955                 SHMQueueDelete(&result->queue);
956             if (! (result = (XIDLookupEnt *)
957                    hash_search(xidTable, (Pointer)&item, HASH_REMOVE_SAVED, &found)) ||
958                 ! found)
959                 {
960                     SpinRelease(masterLock);
961 #ifdef USER_LOCKS
962                     elog(NOTICE,"LockRelease: remove xid, table corrupted");
963 #else
964                     elog(NOTICE,"LockReplace: xid table corrupted");
965 #endif
966                     return(FALSE);
967                 }
968         }
969     
970 #ifdef USER_LOCKS
971     /*
972      * If this is an user lock remove it now, after the
973      * corresponding xid entry has been found and deleted.
974      */
975   if (is_user_lock) {
976     /*
977      * fix the general lock stats
978      */
979     lock->nHolding--;
980     lock->holders[lockt]--;
981     lock->nActive--;
982     lock->activeHolders[lockt]--;
983     
984     Assert(lock->nActive >= 0);
985     
986     if (! lock->nHolding)
987         {
988             /* ------------------
989              * if there's no one waiting in the queue,
990              * we just released the last lock.
991              * Delete it from the lock table.
992              * ------------------
993              */
994             Assert( ltable->lockHash->hash == tag_hash);
995             lock = (LOCK *) hash_search(ltable->lockHash,
996                                         (Pointer) &(lock->tag),
997                                         HASH_REMOVE,
998                                         &found);
999             Assert(lock && found);
1000             wakeupNeeded = false;
1001         }
1002   }
1003 #endif
1004
1005     /* --------------------------
1006      * If there are still active locks of the type I just released, no one
1007      * should be woken up.  Whoever is asleep will still conflict
1008      * with the remaining locks.
1009      * --------------------------
1010      */
1011     if (! (lock->activeHolders[lockt]))
1012         {
1013             /* change the conflict mask.  No more of this lock type. */
1014             lock->mask &= BITS_OFF[lockt];
1015         }
1016     
1017     if (wakeupNeeded)
1018         {
1019             /* --------------------------
1020              * Wake the first waiting process and grant him the lock if it
1021              * doesn't conflict.  The woken process must record the lock
1022              * himself.
1023              * --------------------------
1024              */
1025             (void) ProcLockWakeup(&(lock->waitProcs), (char *) ltable, (char *) lock);
1026         }
1027     
1028     SpinRelease(masterLock);
1029     return(TRUE);
1030 }
1031
1032 /*
1033  * GrantLock -- udpate the lock data structure to show
1034  *      the new lock holder.
1035  */
1036 void
1037 GrantLock(LOCK *lock, LOCKT lockt)
1038 {
1039     lock->nActive++;
1040     lock->activeHolders[lockt]++;
1041     lock->mask |= BITS_ON[lockt];
1042 }
1043
1044 #ifdef USER_LOCKS
1045 /*
1046  * LockReleaseAll -- Release all locks in a process lock queue.
1047  *
1048  * Note: This code is a little complicated by the presence in the
1049  *       same queue of user locks which can't be removed from the
1050  *       normal lock queue at the end of a transaction. They must
1051  *       however be removed when the backend exits.
1052  *       A dummy tableId 0 is used to indicate that we are releasing
1053  *       the user locks, from the code added to ProcKill().
1054  */
1055 #endif
1056 bool
1057 LockReleaseAll(LockTableId tableId, SHM_QUEUE *lockQueue)
1058 {
1059     PROC_QUEUE  *waitQueue;
1060     int         done;
1061     XIDLookupEnt        *xidLook = NULL;
1062     XIDLookupEnt        *tmp = NULL;
1063     SHMEM_OFFSET        end = MAKE_OFFSET(lockQueue);
1064     SPINLOCK    masterLock;
1065     LOCKTAB     *ltable;
1066     int         i,nLockTypes;
1067     LOCK                *lock;
1068     bool        found;
1069     
1070 #ifdef USER_LOCKS
1071     int is_user_lock_table, my_pid, count, nskip;
1072
1073     is_user_lock_table = (tableId == 0);
1074     my_pid = getpid();
1075 #ifdef USER_LOCKS_DEBUG
1076     elog(NOTICE,"LockReleaseAll: tableId=%d, pid=%d", tableId, my_pid);
1077 #endif
1078     if (is_user_lock_table) {
1079         tableId = 1;
1080     }
1081 #endif
1082
1083     Assert (tableId < NumTables);
1084     ltable = AllTables[tableId];
1085     if (!ltable)
1086         return (FALSE);
1087     
1088     nLockTypes = ltable->ctl->nLockTypes;
1089     masterLock = ltable->ctl->masterLock;
1090     
1091     if (SHMQueueEmpty(lockQueue))
1092         return TRUE;
1093     
1094 #ifdef USER_LOCKS
1095     SpinAcquire(masterLock);
1096 #endif
1097     SHMQueueFirst(lockQueue,(Pointer*)&xidLook,&xidLook->queue);
1098     
1099     XID_PRINT("LockReleaseAll:", xidLook);
1100     
1101 #ifndef USER_LOCKS
1102     SpinAcquire(masterLock);
1103 #else
1104     count = nskip = 0;
1105 #endif
1106     for (;;)
1107         {
1108             /* ---------------------------
1109              * XXX Here we assume the shared memory queue is circular and
1110              * that we know its internal structure.  Should have some sort of
1111              * macros to allow one to walk it.  mer 20 July 1991
1112              * ---------------------------
1113              */
1114             done = (xidLook->queue.next == end);
1115             lock = (LOCK *) MAKE_PTR(xidLook->tag.lock);
1116             
1117             LOCK_PRINT("ReleaseAll",(&lock->tag),0);
1118             
1119 #ifdef USER_LOCKS
1120             /*
1121              * Sometimes the queue appears to be messed up.
1122              */
1123             if (count++ > 2000) {
1124                 elog(NOTICE,"LockReleaseAll: xid loop detected, giving up");
1125                 nskip = 0;
1126                 break;
1127             }
1128             if (is_user_lock_table) {
1129                 if ((xidLook->tag.pid == 0) || (xidLook->tag.xid != 0)) {
1130 #ifdef USER_LOCKS_DEBUG
1131                     elog(NOTICE,"LockReleaseAll: skip normal lock [%d,%d,%d]",
1132                          xidLook->tag.lock,xidLook->tag.pid,xidLook->tag.xid);
1133 #endif
1134                     nskip++;
1135                     goto next_item;
1136                 }
1137                 if (xidLook->tag.pid != my_pid) {
1138                     /* This should never happen */
1139 #ifdef USER_LOCKS_DEBUG
1140                     elog(NOTICE,
1141                          "LockReleaseAll: skip other pid [%u,%u] [%d,%d,%d]",
1142                          lock->tag.tupleId.ip_posid,
1143                          ((lock->tag.tupleId.ip_blkid.bi_hi<<16)+
1144                           lock->tag.tupleId.ip_blkid.bi_lo),
1145                          xidLook->tag.lock,xidLook->tag.pid,xidLook->tag.xid);
1146 #endif
1147                     nskip++;
1148                     goto next_item;
1149                 }
1150 #ifdef USER_LOCKS_DEBUG
1151                 elog(NOTICE,
1152                      "LockReleaseAll: release user lock [%u,%u] [%d,%d,%d]",
1153                      lock->tag.tupleId.ip_posid,
1154                      ((lock->tag.tupleId.ip_blkid.bi_hi<<16)+
1155                       lock->tag.tupleId.ip_blkid.bi_lo),
1156                      xidLook->tag.lock,xidLook->tag.pid,xidLook->tag.xid);
1157 #endif
1158             } else {
1159                 if ((xidLook->tag.pid != 0) || (xidLook->tag.xid == 0)) {
1160 #ifdef USER_LOCKS_DEBUG
1161                     elog(NOTICE,
1162                          "LockReleaseAll: skip user lock [%u,%u] [%d,%d,%d]",
1163                          lock->tag.tupleId.ip_posid,
1164                          ((lock->tag.tupleId.ip_blkid.bi_hi<<16)+
1165                           lock->tag.tupleId.ip_blkid.bi_lo),
1166                          xidLook->tag.lock,xidLook->tag.pid,xidLook->tag.xid);
1167 #endif
1168                     nskip++;
1169                     goto next_item;
1170                 }
1171 #ifdef USER_LOCKS_DEBUG
1172                 elog(NOTICE,"LockReleaseAll: release normal lock [%d,%d,%d]",
1173                      xidLook->tag.lock,xidLook->tag.pid,xidLook->tag.xid);
1174 #endif
1175             }
1176 #endif
1177
1178             /* ------------------
1179              * fix the general lock stats
1180              * ------------------
1181              */
1182             if (lock->nHolding != xidLook->nHolding)
1183                 {
1184                     lock->nHolding -= xidLook->nHolding;
1185                     lock->nActive -= xidLook->nHolding;
1186                     Assert(lock->nActive >= 0);
1187                     for (i=1; i<=nLockTypes; i++)
1188                         {
1189                             lock->holders[i] -= xidLook->holders[i];
1190                             lock->activeHolders[i] -= xidLook->holders[i];
1191                             if (! lock->activeHolders[i])
1192                                 lock->mask &= BITS_OFF[i];
1193                         }
1194                 }
1195             else
1196                 {
1197                     /* --------------
1198                      * set nHolding to zero so that we can garbage collect the lock
1199                      * down below...
1200                      * --------------
1201                      */
1202                     lock->nHolding = 0;
1203                 }
1204             /* ----------------
1205              * always remove the xidLookup entry, we're done with it now
1206              * ----------------
1207              */
1208 #ifdef USER_LOCKS
1209             SHMQueueDelete(&xidLook->queue);
1210 #endif
1211             if ((! hash_search(ltable->xidHash, (Pointer)xidLook, HASH_REMOVE, &found))
1212                 || !found)
1213                 {
1214                     SpinRelease(masterLock);
1215 #ifdef USER_LOCKS
1216                     elog(NOTICE,"LockReleaseAll: xid table corrupted");
1217 #else
1218                     elog(NOTICE,"LockReplace: xid table corrupted");
1219 #endif
1220                     return(FALSE);
1221                 }
1222             
1223             if (! lock->nHolding)
1224                 {
1225                     /* --------------------
1226                      * if there's no one waiting in the queue, we've just released
1227                      * the last lock.
1228                      * --------------------
1229                      */
1230                     
1231                     Assert( ltable->lockHash->hash == tag_hash);
1232                     lock = (LOCK *)
1233                         hash_search(ltable->lockHash,(Pointer)&(lock->tag),HASH_REMOVE, &found);
1234                     if ((! lock) || (!found))
1235                         {
1236                             SpinRelease(masterLock);
1237 #ifdef USER_LOCKS
1238                             elog(NOTICE,"LockReleaseAll: cannot remove lock from HTAB");
1239 #else
1240                             elog(NOTICE,"LockReplace: cannot remove lock from HTAB");
1241 #endif
1242                             return(FALSE);
1243                         }
1244                 }
1245             else
1246                 {
1247                     /* --------------------
1248                      * Wake the first waiting process and grant him the lock if it
1249                      * doesn't conflict.  The woken process must record the lock
1250                      * him/herself.
1251                      * --------------------
1252                      */
1253                     waitQueue = &(lock->waitProcs);
1254                     (void) ProcLockWakeup(waitQueue, (char *) ltable, (char *) lock);
1255                 }
1256             
1257 #ifdef USER_LOCKS
1258           next_item:
1259 #endif
1260             if (done)
1261                 break;
1262             SHMQueueFirst(&xidLook->queue,(Pointer*)&tmp,&tmp->queue);
1263             xidLook = tmp;
1264         }
1265     SpinRelease(masterLock);
1266 #ifdef USER_LOCKS
1267     /*
1268      * Reinitialize the queue only if nothing has been left in.
1269      */
1270   if (nskip == 0)
1271 #endif
1272     SHMQueueInit(lockQueue);
1273     return TRUE;
1274 }
1275
1276 int
1277 LockShmemSize()
1278 {
1279     int size = 0;
1280     int nLockBuckets, nLockSegs;
1281     int nXidBuckets, nXidSegs;
1282     
1283     nLockBuckets = 1 << (int)my_log2((NLOCKENTS - 1) / DEF_FFACTOR + 1);
1284     nLockSegs = 1 << (int)my_log2((nLockBuckets - 1) / DEF_SEGSIZE + 1);
1285     
1286     nXidBuckets = 1 << (int)my_log2((NLOCKS_PER_XACT-1) / DEF_FFACTOR + 1);
1287     nXidSegs = 1 << (int)my_log2((nLockBuckets - 1) / DEF_SEGSIZE + 1);
1288     
1289     size += MAXALIGN(NBACKENDS * sizeof(PROC)); /* each MyProc */
1290     size += MAXALIGN(NBACKENDS * sizeof(LOCKCTL));      /* each ltable->ctl */
1291     size += MAXALIGN(sizeof(PROC_HDR));         /* ProcGlobal */
1292     
1293     size += MAXALIGN(my_log2(NLOCKENTS) * sizeof(void *));
1294     size += MAXALIGN(sizeof(HHDR));
1295     size += nLockSegs * MAXALIGN(DEF_SEGSIZE * sizeof(SEGMENT));
1296     size += NLOCKENTS * /* XXX not multiple of BUCKET_ALLOC_INCR? */
1297         (MAXALIGN(sizeof(BUCKET_INDEX)) +
1298          MAXALIGN(sizeof(LOCK))); /* contains hash key */
1299     
1300     size += MAXALIGN(my_log2(NBACKENDS) * sizeof(void *));
1301     size += MAXALIGN(sizeof(HHDR));
1302     size += nXidSegs * MAXALIGN(DEF_SEGSIZE * sizeof(SEGMENT));
1303     size += NBACKENDS * /* XXX not multiple of BUCKET_ALLOC_INCR? */
1304         (MAXALIGN(sizeof(BUCKET_INDEX)) +
1305          MAXALIGN(sizeof(XIDLookupEnt))); /* contains hash key */
1306     
1307     return size;
1308 }
1309
1310 /* -----------------
1311  * Boolean function to determine current locking status
1312  * -----------------
1313  */
1314 bool
1315 LockingDisabled()
1316 {
1317     return LockingIsDisabled;
1318 }