]> granicus.if.org Git - postgresql/blob - src/backend/storage/lmgr/lock.c
getpid/pid cleanup
[postgresql] / src / backend / storage / lmgr / lock.c
1 /*-------------------------------------------------------------------------
2  *
3  * lock.c--
4  *        simple lock acquisition
5  *
6  * Copyright (c) 1994, Regents of the University of California
7  *
8  *
9  * IDENTIFICATION
10  *        $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.21 1998/01/25 05:14:02 momjian Exp $
11  *
12  * NOTES
13  *        Outside modules can create a lock table and acquire/release
14  *        locks.  A lock table is a shared memory hash table.  When
15  *        a process tries to acquire a lock of a type that conflicts
16  *        with existing locks, it is put to sleep using the routines
17  *        in storage/lmgr/proc.c.
18  *
19  *      Interface:
20  *
21  *      LockAcquire(), LockRelease(), LockTabInit().
22  *
23  *      LockReplace() is called only within this module and by the
24  *              lkchain module.  It releases a lock without looking
25  *              the lock up in the lock table.
26  *
27  *      NOTE: This module is used to define new lock tables.  The
28  *              multi-level lock table (multi.c) used by the heap
29  *              access methods calls these routines.  See multi.c for
30  *              examples showing how to use this interface.
31  *
32  *-------------------------------------------------------------------------
33  */
34 #include <stdio.h>                              /* for sprintf() */
35 #include <string.h>
36 #include <sys/types.h>
37 #include <unistd.h>
38
39 #include "postgres.h"
40 #include "miscadmin.h"
41 #include "storage/shmem.h"
42 #include "storage/spin.h"
43 #include "storage/proc.h"
44 #include "storage/lock.h"
45 #include "utils/dynahash.h"
46 #include "utils/hsearch.h"
47 #include "utils/memutils.h"
48 #include "utils/palloc.h"
49 #include "access/xact.h"
50 #include "access/transam.h"
51
52 static int WaitOnLock(LOCKTAB *ltable, LockTableId tableId, LOCK *lock,
53                    LOCKT lockt);
54                    
55 /*#define LOCK_MGR_DEBUG*/
56
57 #ifndef LOCK_MGR_DEBUG
58
59 #define LOCK_PRINT(where,tag,type)
60 #define LOCK_DUMP(where,lock,type)
61 #define LOCK_DUMP_AUX(where,lock,type)
62 #define XID_PRINT(where,xidentP)
63
64 #else                                                   /* LOCK_MGR_DEBUG */
65
66 int                     lockDebug = 0;
67 unsigned int lock_debug_oid_min = BootstrapObjectIdData;
68 static char *lock_types[] = {
69         "NONE",
70         "WRITE",
71         "READ",
72         "WRITE INTENT",
73         "READ INTENT",
74         "EXTEND"
75 };
76
77 #define LOCK_PRINT(where,tag,type)\
78         if ((lockDebug >= 1) && (tag->relId >= lock_debug_oid_min)) \
79                 elog(DEBUG, \
80                          "%s: pid (%d) rel (%d) dbid (%d) tid (%d,%d) type (%s)",where, \
81                          MyProcPid,\
82                          tag->relId, tag->dbId, \
83                          ((tag->tupleId.ip_blkid.bi_hi<<16)+\
84                           tag->tupleId.ip_blkid.bi_lo),\
85                          tag->tupleId.ip_posid, \
86                          lock_types[type])
87
88 #define LOCK_DUMP(where,lock,type)\
89         if ((lockDebug >= 1) && (lock->tag.relId >= lock_debug_oid_min)) \
90                 LOCK_DUMP_AUX(where,lock,type)
91
92 #define LOCK_DUMP_AUX(where,lock,type)\
93                 elog(DEBUG, \
94                          "%s: pid (%d) rel (%d) dbid (%d) tid (%d,%d) nHolding (%d) "\
95                          "holders (%d,%d,%d,%d,%d) type (%s)",where, \
96                          MyProcPid,\
97                          lock->tag.relId, lock->tag.dbId, \
98                          ((lock->tag.tupleId.ip_blkid.bi_hi<<16)+\
99                           lock->tag.tupleId.ip_blkid.bi_lo),\
100                          lock->tag.tupleId.ip_posid, \
101                          lock->nHolding,\
102                          lock->holders[1],\
103                          lock->holders[2],\
104                          lock->holders[3],\
105                          lock->holders[4],\
106                          lock->holders[5],\
107                          lock_types[type])
108
109 #define XID_PRINT(where,xidentP)\
110         if ((lockDebug >= 2) && \
111                 (((LOCK *)MAKE_PTR(xidentP->tag.lock))->tag.relId \
112                  >= lock_debug_oid_min)) \
113                 elog(DEBUG,\
114                          "%s: pid (%d) xid (%d) pid (%d) lock (%x) nHolding (%d) "\
115                          "holders (%d,%d,%d,%d,%d)",\
116                          where,\
117                          MyProcPid,\
118                          xidentP->tag.xid,\
119                          xidentP->tag.pid,\
120                          xidentP->tag.lock,\
121                          xidentP->nHolding,\
122                          xidentP->holders[1],\
123                          xidentP->holders[2],\
124                          xidentP->holders[3],\
125                          xidentP->holders[4],\
126                          xidentP->holders[5])
127
128 #endif                                                  /* LOCK_MGR_DEBUG */
129
130 SPINLOCK        LockMgrLock;            /* in Shmem or created in
131                                                                  * CreateSpinlocks() */
132
133 /* This is to simplify/speed up some bit arithmetic */
134
135 static MASK BITS_OFF[MAX_LOCKTYPES];
136 static MASK BITS_ON[MAX_LOCKTYPES];
137
138 /* -----------------
139  * XXX Want to move this to this file
140  * -----------------
141  */
142 static bool LockingIsDisabled;
143
144 /* -------------------
145  * map from tableId to the lock table structure
146  * -------------------
147  */
148 static LOCKTAB *AllTables[MAX_TABLES];
149
150 /* -------------------
151  * no zero-th table
152  * -------------------
153  */
154 static int      NumTables = 1;
155
156 /* -------------------
157  * InitLocks -- Init the lock module.  Create a private data
158  *              structure for constructing conflict masks.
159  * -------------------
160  */
161 void
162 InitLocks()
163 {
164         int                     i;
165         int                     bit;
166
167         bit = 1;
168         /* -------------------
169          * remember 0th locktype is invalid
170          * -------------------
171          */
172         for (i = 0; i < MAX_LOCKTYPES; i++, bit <<= 1)
173         {
174                 BITS_ON[i] = bit;
175                 BITS_OFF[i] = ~bit;
176         }
177 }
178
179 /* -------------------
180  * LockDisable -- sets LockingIsDisabled flag to TRUE or FALSE.
181  * ------------------
182  */
183 void
184 LockDisable(int status)
185 {
186         LockingIsDisabled = status;
187 }
188
189
190 /*
191  * LockTypeInit -- initialize the lock table's lock type
192  *              structures
193  *
194  * Notes: just copying.  Should only be called once.
195  */
196 static void
197 LockTypeInit(LOCKTAB *ltable,
198                          MASK *conflictsP,
199                          int *prioP,
200                          int ntypes)
201 {
202         int                     i;
203
204         ltable->ctl->nLockTypes = ntypes;
205         ntypes++;
206         for (i = 0; i < ntypes; i++, prioP++, conflictsP++)
207         {
208                 ltable->ctl->conflictTab[i] = *conflictsP;
209                 ltable->ctl->prio[i] = *prioP;
210         }
211 }
212
213 /*
214  * LockTabInit -- initialize a lock table structure
215  *
216  * Notes:
217  *              (a) a lock table has four separate entries in the binding
218  *              table.  This is because every shared hash table and spinlock
219  *              has its name stored in the binding table at its creation.  It
220  *              is wasteful, in this case, but not much space is involved.
221  *
222  */
223 LockTableId
224 LockTabInit(char *tabName,
225                         MASK *conflictsP,
226                         int *prioP,
227                         int ntypes)
228 {
229         LOCKTAB    *ltable;
230         char       *shmemName;
231         HASHCTL         info;
232         int                     hash_flags;
233         bool            found;
234         int                     status = TRUE;
235
236         if (ntypes > MAX_LOCKTYPES)
237         {
238                 elog(NOTICE, "LockTabInit: too many lock types %d greater than %d",
239                          ntypes, MAX_LOCKTYPES);
240                 return (INVALID_TABLEID);
241         }
242
243         if (NumTables > MAX_TABLES)
244         {
245                 elog(NOTICE,
246                          "LockTabInit: system limit of MAX_TABLES (%d) lock tables",
247                          MAX_TABLES);
248                 return (INVALID_TABLEID);
249         }
250
251         /* allocate a string for the binding table lookup */
252         shmemName = (char *) palloc((unsigned) (strlen(tabName) + 32));
253         if (!shmemName)
254         {
255                 elog(NOTICE, "LockTabInit: couldn't malloc string %s \n", tabName);
256                 return (INVALID_TABLEID);
257         }
258
259         /* each lock table has a non-shared header */
260         ltable = (LOCKTAB *) palloc((unsigned) sizeof(LOCKTAB));
261         if (!ltable)
262         {
263                 elog(NOTICE, "LockTabInit: couldn't malloc lock table %s\n", tabName);
264                 pfree(shmemName);
265                 return (INVALID_TABLEID);
266         }
267
268         /* ------------------------
269          * find/acquire the spinlock for the table
270          * ------------------------
271          */
272         SpinAcquire(LockMgrLock);
273
274
275         /* -----------------------
276          * allocate a control structure from shared memory or attach to it
277          * if it already exists.
278          * -----------------------
279          */
280         sprintf(shmemName, "%s (ctl)", tabName);
281         ltable->ctl = (LOCKCTL *)
282                 ShmemInitStruct(shmemName, (unsigned) sizeof(LOCKCTL), &found);
283
284         if (!ltable->ctl)
285         {
286                 elog(FATAL, "LockTabInit: couldn't initialize %s", tabName);
287                 status = FALSE;
288         }
289
290         /* ----------------
291          * we're first - initialize
292          * ----------------
293          */
294         if (!found)
295         {
296                 MemSet(ltable->ctl, 0, sizeof(LOCKCTL));
297                 ltable->ctl->masterLock = LockMgrLock;
298                 ltable->ctl->tableId = NumTables;
299         }
300
301         /* --------------------
302          * other modules refer to the lock table by a tableId
303          * --------------------
304          */
305         AllTables[NumTables] = ltable;
306         NumTables++;
307         Assert(NumTables <= MAX_TABLES);
308
309         /* ----------------------
310          * allocate a hash table for the lock tags.  This is used
311          * to find the different locks.
312          * ----------------------
313          */
314         info.keysize = sizeof(LOCKTAG);
315         info.datasize = sizeof(LOCK);
316         info.hash = tag_hash;
317         hash_flags = (HASH_ELEM | HASH_FUNCTION);
318
319         sprintf(shmemName, "%s (lock hash)", tabName);
320         ltable->lockHash = (HTAB *) ShmemInitHash(shmemName,
321                                                                                  INIT_TABLE_SIZE, MAX_TABLE_SIZE,
322                                                                                           &info, hash_flags);
323
324         Assert(ltable->lockHash->hash == tag_hash);
325         if (!ltable->lockHash)
326         {
327                 elog(FATAL, "LockTabInit: couldn't initialize %s", tabName);
328                 status = FALSE;
329         }
330
331         /* -------------------------
332          * allocate an xid table.  When different transactions hold
333          * the same lock, additional information must be saved (locks per tx).
334          * -------------------------
335          */
336         info.keysize = XID_TAGSIZE;
337         info.datasize = sizeof(XIDLookupEnt);
338         info.hash = tag_hash;
339         hash_flags = (HASH_ELEM | HASH_FUNCTION);
340
341         sprintf(shmemName, "%s (xid hash)", tabName);
342         ltable->xidHash = (HTAB *) ShmemInitHash(shmemName,
343                                                                                  INIT_TABLE_SIZE, MAX_TABLE_SIZE,
344                                                                                          &info, hash_flags);
345
346         if (!ltable->xidHash)
347         {
348                 elog(FATAL, "LockTabInit: couldn't initialize %s", tabName);
349                 status = FALSE;
350         }
351
352         /* init ctl data structures */
353         LockTypeInit(ltable, conflictsP, prioP, ntypes);
354
355         SpinRelease(LockMgrLock);
356
357         pfree(shmemName);
358
359         if (status)
360                 return (ltable->ctl->tableId);
361         else
362                 return (INVALID_TABLEID);
363 }
364
365 /*
366  * LockTabRename -- allocate another tableId to the same
367  *              lock table.
368  *
369  * NOTES: Both the lock module and the lock chain (lchain.c)
370  *              module use table id's to distinguish between different
371  *              kinds of locks.  Short term and long term locks look
372  *              the same to the lock table, but are handled differently
373  *              by the lock chain manager.      This function allows the
374  *              client to use different tableIds when acquiring/releasing
375  *              short term and long term locks.
376  */
377 #ifdef NOT_USED
378 LockTableId
379 LockTabRename(LockTableId tableId)
380 {
381         LockTableId newTableId;
382
383         if (NumTables >= MAX_TABLES)
384         {
385                 return (INVALID_TABLEID);
386         }
387         if (AllTables[tableId] == INVALID_TABLEID)
388         {
389                 return (INVALID_TABLEID);
390         }
391
392         /* other modules refer to the lock table by a tableId */
393         newTableId = NumTables;
394         NumTables++;
395
396         AllTables[newTableId] = AllTables[tableId];
397         return (newTableId);
398 }
399
400 #endif
401
402 /*
403  * LockAcquire -- Check for lock conflicts, sleep if conflict found,
404  *              set lock if/when no conflicts.
405  *
406  * Returns: TRUE if parameters are correct, FALSE otherwise.
407  *
408  * Side Effects: The lock is always acquired.  No way to abort
409  *              a lock acquisition other than aborting the transaction.
410  *              Lock is recorded in the lkchain.
411 #ifdef USER_LOCKS
412  * Note on User Locks:
413  *              User locks are handled totally on the application side as
414  *              long term cooperative locks which extend beyond the normal
415  *              transaction boundaries.  Their purpose is to indicate to an
416  *              application that someone is `working' on an item.  So it is
417  *              possible to put an user lock on a tuple's oid, retrieve the
418  *              tuple, work on it for an hour and then update it and remove
419  *              the lock.  While the lock is active other clients can still
420  *              read and write the tuple but they can be aware that it has
421  *              been locked at the application level by someone.
422  *              User locks use lock tags made of an uint16 and an uint32, for
423  *              example 0 and a tuple oid, or any other arbitrary pair of
424  *              numbers following a convention established by the application.
425  *              In this sense tags don't refer to tuples or database entities.
426  *              User locks and normal locks are completely orthogonal and
427  *              they don't interfere with each other, so it is possible
428  *              to acquire a normal lock on an user-locked tuple or user-lock
429  *              a tuple for which a normal write lock already exists.
430  *              User locks are always non blocking, therefore they are never
431  *              acquired if already held by another process.  They must be
432  *              released explicitly by the application but they are released
433  *              automatically when a backend terminates.
434  *              They are indicated by a dummy tableId 0 which doesn't have
435  *              any table allocated but uses the normal lock table, and are
436  *              distinguished from normal locks for the following differences:
437  *
438  *                                                                              normal lock             user lock
439  *
440  *              tableId                                                 1                               0
441  *              tag.relId                                               rel oid                 0
442  *              tag.ItemPointerData.ip_blkid    block id                lock id2
443  *              tag.ItemPointerData.ip_posid    tuple offset    lock id1
444  *              xid.pid                                                 0                               backend pid
445  *              xid.xid                                                 current xid             0
446  *              persistence                                             transaction             user or backend
447  *
448  *              The lockt parameter can have the same values for normal locks
449  *              although probably only WRITE_LOCK can have some practical use.
450  *
451  *                                                                                                              DZ - 4 Oct 1996
452 #endif
453  */
454
455 bool
456 LockAcquire(LockTableId tableId, LOCKTAG *lockName, LOCKT lockt)
457 {
458         XIDLookupEnt *result,
459                                 item;
460         HTAB       *xidTable;
461         bool            found;
462         LOCK       *lock = NULL;
463         SPINLOCK        masterLock;
464         LOCKTAB    *ltable;
465         int                     status;
466         TransactionId myXid;
467
468 #ifdef USER_LOCKS
469         int                     is_user_lock;
470
471         is_user_lock = (tableId == 0);
472         if (is_user_lock)
473         {
474                 tableId = 1;
475 #ifdef USER_LOCKS_DEBUG
476                 elog(NOTICE, "LockAcquire: user lock tag [%u,%u] %d",
477                          lockName->tupleId.ip_posid,
478                          ((lockName->tupleId.ip_blkid.bi_hi << 16) +
479                           lockName->tupleId.ip_blkid.bi_lo),
480                          lockt);
481 #endif
482         }
483 #endif
484
485         Assert(tableId < NumTables);
486         ltable = AllTables[tableId];
487         if (!ltable)
488         {
489                 elog(NOTICE, "LockAcquire: bad lock table %d", tableId);
490                 return (FALSE);
491         }
492
493         if (LockingIsDisabled)
494         {
495                 return (TRUE);
496         }
497
498         LOCK_PRINT("Acquire", lockName, lockt);
499         masterLock = ltable->ctl->masterLock;
500
501         SpinAcquire(masterLock);
502
503         Assert(ltable->lockHash->hash == tag_hash);
504         lock = (LOCK *) hash_search(ltable->lockHash, (Pointer) lockName, HASH_ENTER, &found);
505
506         if (!lock)
507         {
508                 SpinRelease(masterLock);
509                 elog(FATAL, "LockAcquire: lock table %d is corrupted", tableId);
510                 return (FALSE);
511         }
512
513         /* --------------------
514          * if there was nothing else there, complete initialization
515          * --------------------
516          */
517         if (!found)
518         {
519                 lock->mask = 0;
520                 ProcQueueInit(&(lock->waitProcs));
521                 MemSet((char *) lock->holders, 0, sizeof(int) * MAX_LOCKTYPES);
522                 MemSet((char *) lock->activeHolders, 0, sizeof(int) * MAX_LOCKTYPES);
523                 lock->nHolding = 0;
524                 lock->nActive = 0;
525
526                 Assert(BlockIdEquals(&(lock->tag.tupleId.ip_blkid),
527                                                          &(lockName->tupleId.ip_blkid)));
528
529         }
530
531         /* ------------------
532          * add an element to the lock queue so that we can clear the
533          * locks at end of transaction.
534          * ------------------
535          */
536         xidTable = ltable->xidHash;
537         myXid = GetCurrentTransactionId();
538
539         /* ------------------
540          * Zero out all of the tag bytes (this clears the padding bytes for long
541          * word alignment and ensures hashing consistency).
542          * ------------------
543          */
544         MemSet(&item, 0, XID_TAGSIZE); /* must clear padding, needed */
545         TransactionIdStore(myXid, &item.tag.xid);
546         item.tag.lock = MAKE_OFFSET(lock);
547 #if 0
548         item.tag.pid = MyPid;
549 #endif
550
551 #ifdef USER_LOCKS
552         if (is_user_lock)
553         {
554                 item.tag.pid = MyProcPid;
555                 item.tag.xid = myXid = 0;
556 #ifdef USER_LOCKS_DEBUG
557                 elog(NOTICE, "LockAcquire: user lock xid [%d,%d,%d]",
558                          item.tag.lock, item.tag.pid, item.tag.xid);
559 #endif
560         }
561 #endif
562
563         result = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &item, HASH_ENTER, &found);
564         if (!result)
565         {
566                 elog(NOTICE, "LockAcquire: xid table corrupted");
567                 return (STATUS_ERROR);
568         }
569         if (!found)
570         {
571                 XID_PRINT("LockAcquire: queueing XidEnt", result);
572                 ProcAddLock(&result->queue);
573                 result->nHolding = 0;
574                 MemSet((char *) result->holders, 0, sizeof(int) * MAX_LOCKTYPES);
575         }
576
577         /* ----------------
578          * lock->nholding tells us how many processes have _tried_ to
579          * acquire this lock,  Regardless of whether they succeeded or
580          * failed in doing so.
581          * ----------------
582          */
583         lock->nHolding++;
584         lock->holders[lockt]++;
585
586         /* --------------------
587          * If I'm the only one holding a lock, then there
588          * cannot be a conflict.  Need to subtract one from the
589          * lock's count since we just bumped the count up by 1
590          * above.
591          * --------------------
592          */
593         if (result->nHolding == lock->nActive)
594         {
595                 result->holders[lockt]++;
596                 result->nHolding++;
597                 GrantLock(lock, lockt);
598                 SpinRelease(masterLock);
599                 return (TRUE);
600         }
601
602         Assert(result->nHolding <= lock->nActive);
603
604         status = LockResolveConflicts(ltable, lock, lockt, myXid);
605
606         if (status == STATUS_OK)
607         {
608                 GrantLock(lock, lockt);
609         }
610         else if (status == STATUS_FOUND)
611         {
612 #ifdef USER_LOCKS
613
614                 /*
615                  * User locks are non blocking. If we can't acquire a lock remove
616                  * the xid entry and return FALSE without waiting.
617                  */
618                 if (is_user_lock)
619                 {
620                         if (!result->nHolding)
621                         {
622                                 SHMQueueDelete(&result->queue);
623                                 hash_search(xidTable, (Pointer) &item, HASH_REMOVE, &found);
624                         }
625                         lock->nHolding--;
626                         lock->holders[lockt]--;
627                         SpinRelease(masterLock);
628 #ifdef USER_LOCKS_DEBUG
629                         elog(NOTICE, "LockAcquire: user lock failed");
630 #endif
631                         return (FALSE);
632                 }
633 #endif
634                 status = WaitOnLock(ltable, tableId, lock, lockt);
635                 XID_PRINT("Someone granted me the lock", result);
636         }
637
638         SpinRelease(masterLock);
639
640         return (status == STATUS_OK);
641 }
642
643 /* ----------------------------
644  * LockResolveConflicts -- test for lock conflicts
645  *
646  * NOTES:
647  *              Here's what makes this complicated: one transaction's
648  * locks don't conflict with one another.  When many processes
649  * hold locks, each has to subtract off the other's locks when
650  * determining whether or not any new lock acquired conflicts with
651  * the old ones.
652  *
653  *      For example, if I am already holding a WRITE_INTENT lock,
654  *      there will not be a conflict with my own READ_LOCK.  If I
655  *      don't consider the intent lock when checking for conflicts,
656  *      I find no conflict.
657  * ----------------------------
658  */
659 int
660 LockResolveConflicts(LOCKTAB *ltable,
661                                          LOCK *lock,
662                                          LOCKT lockt,
663                                          TransactionId xid)
664 {
665         XIDLookupEnt *result,
666                                 item;
667         int                *myHolders;
668         int                     nLockTypes;
669         HTAB       *xidTable;
670         bool            found;
671         int                     bitmask;
672         int                     i,
673                                 tmpMask;
674
675         nLockTypes = ltable->ctl->nLockTypes;
676         xidTable = ltable->xidHash;
677
678         /* ---------------------
679          * read my own statistics from the xid table.  If there
680          * isn't an entry, then we'll just add one.
681          *
682          * Zero out the tag, this clears the padding bytes for long
683          * word alignment and ensures hashing consistency.
684          * ------------------
685          */
686         MemSet(&item, 0, XID_TAGSIZE);
687         TransactionIdStore(xid, &item.tag.xid);
688         item.tag.lock = MAKE_OFFSET(lock);
689 #if 0
690         item.tag.pid = pid;
691 #endif
692
693         if (!(result = (XIDLookupEnt *)
694                   hash_search(xidTable, (Pointer) &item, HASH_ENTER, &found)))
695         {
696                 elog(NOTICE, "LockResolveConflicts: xid table corrupted");
697                 return (STATUS_ERROR);
698         }
699         myHolders = result->holders;
700
701         if (!found)
702         {
703                 /* ---------------
704                  * we're not holding any type of lock yet.  Clear
705                  * the lock stats.
706                  * ---------------
707                  */
708                 MemSet(result->holders, 0, nLockTypes * sizeof(*(lock->holders)));
709                 result->nHolding = 0;
710         }
711
712         {
713                 /* ------------------------
714                  * If someone with a greater priority is waiting for the lock,
715                  * do not continue and share the lock, even if we can.  bjm
716                  * ------------------------
717                  */
718                 int                             myprio = ltable->ctl->prio[lockt];
719                 PROC_QUEUE              *waitQueue = &(lock->waitProcs);
720                 PROC                    *topproc = (PROC *) MAKE_PTR(waitQueue->links.prev);
721
722                 if (waitQueue->size && topproc->prio > myprio)
723                         return STATUS_FOUND;
724         }
725
726         /* ----------------------------
727          * first check for global conflicts: If no locks conflict
728          * with mine, then I get the lock.
729          *
730          * Checking for conflict: lock->mask represents the types of
731          * currently held locks.  conflictTable[lockt] has a bit
732          * set for each type of lock that conflicts with mine.  Bitwise
733          * compare tells if there is a conflict.
734          * ----------------------------
735          */
736         if (!(ltable->ctl->conflictTab[lockt] & lock->mask))
737         {
738
739                 result->holders[lockt]++;
740                 result->nHolding++;
741
742                 XID_PRINT("Conflict Resolved: updated xid entry stats", result);
743
744                 return (STATUS_OK);
745         }
746
747         /* ------------------------
748          * Rats.  Something conflicts. But it could still be my own
749          * lock.  We have to construct a conflict mask
750          * that does not reflect our own locks.
751          * ------------------------
752          */
753         bitmask = 0;
754         tmpMask = 2;
755         for (i = 1; i <= nLockTypes; i++, tmpMask <<= 1)
756         {
757                 if (lock->activeHolders[i] - myHolders[i])
758                 {
759                         bitmask |= tmpMask;
760                 }
761         }
762
763         /* ------------------------
764          * now check again for conflicts.  'bitmask' describes the types
765          * of locks held by other processes.  If one of these
766          * conflicts with the kind of lock that I want, there is a
767          * conflict and I have to sleep.
768          * ------------------------
769          */
770         if (!(ltable->ctl->conflictTab[lockt] & bitmask))
771         {
772
773                 /* no conflict. Get the lock and go on */
774
775                 result->holders[lockt]++;
776                 result->nHolding++;
777
778                 XID_PRINT("Conflict Resolved: updated xid entry stats", result);
779
780                 return (STATUS_OK);
781
782         }
783
784         return (STATUS_FOUND);
785 }
786
787 static int
788 WaitOnLock(LOCKTAB *ltable, LockTableId tableId, LOCK *lock, LOCKT lockt)
789 {
790         PROC_QUEUE *waitQueue = &(lock->waitProcs);
791
792         int                     prio = ltable->ctl->prio[lockt];
793
794         /*
795          * the waitqueue is ordered by priority. I insert myself according to
796          * the priority of the lock I am acquiring.
797          *
798          * SYNC NOTE: I am assuming that the lock table spinlock is sufficient
799          * synchronization for this queue.      That will not be true if/when
800          * people can be deleted from the queue by a SIGINT or something.
801          */
802         LOCK_DUMP_AUX("WaitOnLock: sleeping on lock", lock, lockt);
803         if (ProcSleep(waitQueue,
804                                   ltable->ctl->masterLock,
805                                   lockt,
806                                   prio,
807                                   lock) != NO_ERROR)
808         {
809                 /* -------------------
810                  * This could have happend as a result of a deadlock, see HandleDeadLock()
811                  * Decrement the lock nHolding and holders fields as we are no longer
812                  * waiting on this lock.
813                  * -------------------
814                  */
815                 lock->nHolding--;
816                 lock->holders[lockt]--;
817                 LOCK_DUMP_AUX("WaitOnLock: aborting on lock", lock, lockt);
818                 SpinRelease(ltable->ctl->masterLock);
819                 elog(ERROR, "WaitOnLock: error on wakeup - Aborting this transaction");
820         }
821
822         LOCK_DUMP_AUX("WaitOnLock: wakeup on lock", lock, lockt);
823         return (STATUS_OK);
824 }
825
826 /*
827  * LockRelease -- look up 'lockName' in lock table 'tableId' and
828  *              release it.
829  *
830  * Side Effects: if the lock no longer conflicts with the highest
831  *              priority waiting process, that process is granted the lock
832  *              and awoken. (We have to grant the lock here to avoid a
833  *              race between the waking process and any new process to
834  *              come along and request the lock).
835  */
836 bool
837 LockRelease(LockTableId tableId, LOCKTAG *lockName, LOCKT lockt)
838 {
839         LOCK       *lock = NULL;
840         SPINLOCK        masterLock;
841         bool            found;
842         LOCKTAB    *ltable;
843         XIDLookupEnt *result,
844                                 item;
845         HTAB       *xidTable;
846         bool            wakeupNeeded = true;
847
848 #ifdef USER_LOCKS
849         int                     is_user_lock;
850
851         is_user_lock = (tableId == 0);
852         if (is_user_lock)
853         {
854                 tableId = 1;
855 #ifdef USER_LOCKS_DEBUG
856                 elog(NOTICE, "LockRelease: user lock tag [%u,%u] %d",
857                          lockName->tupleId.ip_posid,
858                          ((lockName->tupleId.ip_blkid.bi_hi << 16) +
859                           lockName->tupleId.ip_blkid.bi_lo),
860                          lockt);
861 #endif
862         }
863 #endif
864
865         Assert(tableId < NumTables);
866         ltable = AllTables[tableId];
867         if (!ltable)
868         {
869                 elog(NOTICE, "ltable is null in LockRelease");
870                 return (FALSE);
871         }
872
873         if (LockingIsDisabled)
874         {
875                 return (TRUE);
876         }
877
878         LOCK_PRINT("Release", lockName, lockt);
879
880         masterLock = ltable->ctl->masterLock;
881         xidTable = ltable->xidHash;
882
883         SpinAcquire(masterLock);
884
885         Assert(ltable->lockHash->hash == tag_hash);
886         lock = (LOCK *)
887                 hash_search(ltable->lockHash, (Pointer) lockName, HASH_FIND_SAVE, &found);
888
889 #ifdef USER_LOCKS
890
891         /*
892          * If the entry is not found hash_search returns TRUE instead of NULL,
893          * so we must check it explicitly.
894          */
895         if ((is_user_lock) && (lock == (LOCK *) TRUE))
896         {
897                 SpinRelease(masterLock);
898                 elog(NOTICE, "LockRelease: there are no locks with this tag");
899                 return (FALSE);
900         }
901 #endif
902
903         /*
904          * let the caller print its own error message, too. Do not elog(ERROR).
905          */
906         if (!lock)
907         {
908                 SpinRelease(masterLock);
909                 elog(NOTICE, "LockRelease: locktable corrupted");
910                 return (FALSE);
911         }
912
913         if (!found)
914         {
915                 SpinRelease(masterLock);
916                 elog(NOTICE, "LockRelease: locktable lookup failed, no lock");
917                 return (FALSE);
918         }
919
920         Assert(lock->nHolding > 0);
921
922 #ifdef USER_LOCKS
923
924         /*
925          * If this is an user lock it can be removed only after checking that
926          * it was acquired by the current process, so this code is skipped and
927          * executed later.
928          */
929         if (!is_user_lock)
930         {
931 #endif
932
933                 /*
934                  * fix the general lock stats
935                  */
936                 lock->nHolding--;
937                 lock->holders[lockt]--;
938                 lock->nActive--;
939                 lock->activeHolders[lockt]--;
940
941                 Assert(lock->nActive >= 0);
942
943                 if (!lock->nHolding)
944                 {
945                         /* ------------------
946                          * if there's no one waiting in the queue,
947                          * we just released the last lock.
948                          * Delete it from the lock table.
949                          * ------------------
950                          */
951                         Assert(ltable->lockHash->hash == tag_hash);
952                         lock = (LOCK *) hash_search(ltable->lockHash,
953                                                                                 (Pointer) &(lock->tag),
954                                                                                 HASH_REMOVE_SAVED,
955                                                                                 &found);
956                         Assert(lock && found);
957                         wakeupNeeded = false;
958                 }
959 #ifdef USER_LOCKS
960         }
961 #endif
962
963         /* ------------------
964          * Zero out all of the tag bytes (this clears the padding bytes for long
965          * word alignment and ensures hashing consistency).
966          * ------------------
967          */
968         MemSet(&item, 0, XID_TAGSIZE);
969
970         TransactionIdStore(GetCurrentTransactionId(), &item.tag.xid);
971         item.tag.lock = MAKE_OFFSET(lock);
972 #if 0
973         item.tag.pid = MyPid;
974 #endif
975
976 #ifdef USER_LOCKS
977         if (is_user_lock)
978         {
979                 item.tag.pid = MyProcPid;
980                 item.tag.xid = 0;
981 #ifdef USER_LOCKS_DEBUG
982                 elog(NOTICE, "LockRelease: user lock xid [%d,%d,%d]",
983                          item.tag.lock, item.tag.pid, item.tag.xid);
984 #endif
985         }
986 #endif
987
988         if (!(result = (XIDLookupEnt *) hash_search(xidTable,
989                                                                                                 (Pointer) &item,
990                                                                                                 HASH_FIND_SAVE,
991                                                                                                 &found))
992                 || !found)
993         {
994                 SpinRelease(masterLock);
995 #ifdef USER_LOCKS
996                 if ((is_user_lock) && (result))
997                 {
998                         elog(NOTICE, "LockRelease: you don't have a lock on this tag");
999                 }
1000                 else
1001                 {
1002                         elog(NOTICE, "LockRelease: find xid, table corrupted");
1003                 }
1004 #else
1005                 elog(NOTICE, "LockReplace: xid table corrupted");
1006 #endif
1007                 return (FALSE);
1008         }
1009
1010         /*
1011          * now check to see if I have any private locks.  If I do, decrement
1012          * the counts associated with them.
1013          */
1014         result->holders[lockt]--;
1015         result->nHolding--;
1016
1017         XID_PRINT("LockRelease updated xid stats", result);
1018
1019         /*
1020          * If this was my last hold on this lock, delete my entry in the XID
1021          * table.
1022          */
1023         if (!result->nHolding)
1024         {
1025 #ifdef USER_LOCKS
1026                 if (result->queue.prev == INVALID_OFFSET)
1027                 {
1028                         elog(NOTICE, "LockRelease: xid.prev == INVALID_OFFSET");
1029                 }
1030                 if (result->queue.next == INVALID_OFFSET)
1031                 {
1032                         elog(NOTICE, "LockRelease: xid.next == INVALID_OFFSET");
1033                 }
1034 #endif
1035                 if (result->queue.next != INVALID_OFFSET)
1036                         SHMQueueDelete(&result->queue);
1037                 if (!(result = (XIDLookupEnt *)
1038                           hash_search(xidTable, (Pointer) &item, HASH_REMOVE_SAVED, &found)) ||
1039                         !found)
1040                 {
1041                         SpinRelease(masterLock);
1042 #ifdef USER_LOCKS
1043                         elog(NOTICE, "LockRelease: remove xid, table corrupted");
1044 #else
1045                         elog(NOTICE, "LockReplace: xid table corrupted");
1046 #endif
1047                         return (FALSE);
1048                 }
1049         }
1050
1051 #ifdef USER_LOCKS
1052
1053         /*
1054          * If this is an user lock remove it now, after the corresponding xid
1055          * entry has been found and deleted.
1056          */
1057         if (is_user_lock)
1058         {
1059
1060                 /*
1061                  * fix the general lock stats
1062                  */
1063                 lock->nHolding--;
1064                 lock->holders[lockt]--;
1065                 lock->nActive--;
1066                 lock->activeHolders[lockt]--;
1067
1068                 Assert(lock->nActive >= 0);
1069
1070                 if (!lock->nHolding)
1071                 {
1072                         /* ------------------
1073                          * if there's no one waiting in the queue,
1074                          * we just released the last lock.
1075                          * Delete it from the lock table.
1076                          * ------------------
1077                          */
1078                         Assert(ltable->lockHash->hash == tag_hash);
1079                         lock = (LOCK *) hash_search(ltable->lockHash,
1080                                                                                 (Pointer) &(lock->tag),
1081                                                                                 HASH_REMOVE,
1082                                                                                 &found);
1083                         Assert(lock && found);
1084                         wakeupNeeded = false;
1085                 }
1086         }
1087 #endif
1088
1089         /* --------------------------
1090          * If there are still active locks of the type I just released, no one
1091          * should be woken up.  Whoever is asleep will still conflict
1092          * with the remaining locks.
1093          * --------------------------
1094          */
1095         if (!(lock->activeHolders[lockt]))
1096         {
1097                 /* change the conflict mask.  No more of this lock type. */
1098                 lock->mask &= BITS_OFF[lockt];
1099         }
1100
1101         if (wakeupNeeded)
1102         {
1103                 /* --------------------------
1104                  * Wake the first waiting process and grant him the lock if it
1105                  * doesn't conflict.  The woken process must record the lock
1106                  * himself.
1107                  * --------------------------
1108                  */
1109                 ProcLockWakeup(&(lock->waitProcs), (char *) ltable, (char *) lock);
1110         }
1111
1112         SpinRelease(masterLock);
1113         return (TRUE);
1114 }
1115
1116 /*
1117  * GrantLock -- udpate the lock data structure to show
1118  *              the new lock holder.
1119  */
1120 void
1121 GrantLock(LOCK *lock, LOCKT lockt)
1122 {
1123         lock->nActive++;
1124         lock->activeHolders[lockt]++;
1125         lock->mask |= BITS_ON[lockt];
1126 }
1127
1128 #ifdef USER_LOCKS
1129 /*
1130  * LockReleaseAll -- Release all locks in a process lock queue.
1131  *
1132  * Note: This code is a little complicated by the presence in the
1133  *               same queue of user locks which can't be removed from the
1134  *               normal lock queue at the end of a transaction. They must
1135  *               however be removed when the backend exits.
1136  *               A dummy tableId 0 is used to indicate that we are releasing
1137  *               the user locks, from the code added to ProcKill().
1138  */
1139 #endif
1140 bool
1141 LockReleaseAll(LockTableId tableId, SHM_QUEUE *lockQueue)
1142 {
1143         PROC_QUEUE *waitQueue;
1144         int                     done;
1145         XIDLookupEnt *xidLook = NULL;
1146         XIDLookupEnt *tmp = NULL;
1147         SHMEM_OFFSET end = MAKE_OFFSET(lockQueue);
1148         SPINLOCK        masterLock;
1149         LOCKTAB    *ltable;
1150         int                     i,
1151                                 nLockTypes;
1152         LOCK       *lock;
1153         bool            found;
1154
1155 #ifdef USER_LOCKS
1156         int                     is_user_lock_table,
1157                                 count,
1158                                 nskip;
1159
1160         is_user_lock_table = (tableId == 0);
1161 #ifdef USER_LOCKS_DEBUG
1162         elog(NOTICE, "LockReleaseAll: tableId=%d, pid=%d", tableId, MyProcPid);
1163 #endif
1164         if (is_user_lock_table)
1165         {
1166                 tableId = 1;
1167         }
1168 #endif
1169
1170         Assert(tableId < NumTables);
1171         ltable = AllTables[tableId];
1172         if (!ltable)
1173                 return (FALSE);
1174
1175         nLockTypes = ltable->ctl->nLockTypes;
1176         masterLock = ltable->ctl->masterLock;
1177
1178         if (SHMQueueEmpty(lockQueue))
1179                 return TRUE;
1180
1181 #ifdef USER_LOCKS
1182         SpinAcquire(masterLock);
1183 #endif
1184         SHMQueueFirst(lockQueue, (Pointer *) &xidLook, &xidLook->queue);
1185
1186         XID_PRINT("LockReleaseAll", xidLook);
1187
1188 #ifndef USER_LOCKS
1189         SpinAcquire(masterLock);
1190 #else
1191         count = nskip = 0;
1192 #endif
1193         for (;;)
1194         {
1195                 /* ---------------------------
1196                  * XXX Here we assume the shared memory queue is circular and
1197                  * that we know its internal structure.  Should have some sort of
1198                  * macros to allow one to walk it.      mer 20 July 1991
1199                  * ---------------------------
1200                  */
1201                 done = (xidLook->queue.next == end);
1202                 lock = (LOCK *) MAKE_PTR(xidLook->tag.lock);
1203
1204                 LOCK_PRINT("ReleaseAll", (&lock->tag), 0);
1205
1206 #ifdef USER_LOCKS
1207
1208                 /*
1209                  * Sometimes the queue appears to be messed up.
1210                  */
1211                 if (count++ > 2000)
1212                 {
1213                         elog(NOTICE, "LockReleaseAll: xid loop detected, giving up");
1214                         nskip = 0;
1215                         break;
1216                 }
1217                 if (is_user_lock_table)
1218                 {
1219                         if ((xidLook->tag.pid == 0) || (xidLook->tag.xid != 0))
1220                         {
1221 #ifdef USER_LOCKS_DEBUG
1222                                 elog(NOTICE, "LockReleaseAll: skip normal lock [%d,%d,%d]",
1223                                   xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
1224 #endif
1225                                 nskip++;
1226                                 goto next_item;
1227                         }
1228                         if (xidLook->tag.pid != MyProcPid)
1229                         {
1230                                 /* This should never happen */
1231 #ifdef USER_LOCKS_DEBUG
1232                                 elog(NOTICE,
1233                                          "LockReleaseAll: skip other pid [%u,%u] [%d,%d,%d]",
1234                                          lock->tag.tupleId.ip_posid,
1235                                          ((lock->tag.tupleId.ip_blkid.bi_hi << 16) +
1236                                           lock->tag.tupleId.ip_blkid.bi_lo),
1237                                   xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
1238 #endif
1239                                 nskip++;
1240                                 goto next_item;
1241                         }
1242 #ifdef USER_LOCKS_DEBUG
1243                         elog(NOTICE,
1244                                  "LockReleaseAll: release user lock [%u,%u] [%d,%d,%d]",
1245                                  lock->tag.tupleId.ip_posid,
1246                                  ((lock->tag.tupleId.ip_blkid.bi_hi << 16) +
1247                                   lock->tag.tupleId.ip_blkid.bi_lo),
1248                                  xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
1249 #endif
1250                 }
1251                 else
1252                 {
1253                         if ((xidLook->tag.pid != 0) || (xidLook->tag.xid == 0))
1254                         {
1255 #ifdef USER_LOCKS_DEBUG
1256                                 elog(NOTICE,
1257                                          "LockReleaseAll: skip user lock [%u,%u] [%d,%d,%d]",
1258                                          lock->tag.tupleId.ip_posid,
1259                                          ((lock->tag.tupleId.ip_blkid.bi_hi << 16) +
1260                                           lock->tag.tupleId.ip_blkid.bi_lo),
1261                                   xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
1262 #endif
1263                                 nskip++;
1264                                 goto next_item;
1265                         }
1266 #ifdef USER_LOCKS_DEBUG
1267                         elog(NOTICE, "LockReleaseAll: release normal lock [%d,%d,%d]",
1268                                  xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
1269 #endif
1270                 }
1271 #endif
1272
1273                 /* ------------------
1274                  * fix the general lock stats
1275                  * ------------------
1276                  */
1277                 if (lock->nHolding != xidLook->nHolding)
1278                 {
1279                         lock->nHolding -= xidLook->nHolding;
1280                         lock->nActive -= xidLook->nHolding;
1281                         Assert(lock->nActive >= 0);
1282                         for (i = 1; i <= nLockTypes; i++)
1283                         {
1284                                 lock->holders[i] -= xidLook->holders[i];
1285                                 lock->activeHolders[i] -= xidLook->holders[i];
1286                                 if (!lock->activeHolders[i])
1287                                         lock->mask &= BITS_OFF[i];
1288                         }
1289                 }
1290                 else
1291                 {
1292                         /* --------------
1293                          * set nHolding to zero so that we can garbage collect the lock
1294                          * down below...
1295                          * --------------
1296                          */
1297                         lock->nHolding = 0;
1298                 }
1299                 /* ----------------
1300                  * always remove the xidLookup entry, we're done with it now
1301                  * ----------------
1302                  */
1303 #ifdef USER_LOCKS
1304                 SHMQueueDelete(&xidLook->queue);
1305 #endif
1306                 if ((!hash_search(ltable->xidHash, (Pointer) xidLook, HASH_REMOVE, &found))
1307                         || !found)
1308                 {
1309                         SpinRelease(masterLock);
1310 #ifdef USER_LOCKS
1311                         elog(NOTICE, "LockReleaseAll: xid table corrupted");
1312 #else
1313                         elog(NOTICE, "LockReplace: xid table corrupted");
1314 #endif
1315                         return (FALSE);
1316                 }
1317
1318                 if (!lock->nHolding)
1319                 {
1320                         /* --------------------
1321                          * if there's no one waiting in the queue, we've just released
1322                          * the last lock.
1323                          * --------------------
1324                          */
1325
1326                         Assert(ltable->lockHash->hash == tag_hash);
1327                         lock = (LOCK *)
1328                                 hash_search(ltable->lockHash, (Pointer) &(lock->tag), HASH_REMOVE, &found);
1329                         if ((!lock) || (!found))
1330                         {
1331                                 SpinRelease(masterLock);
1332 #ifdef USER_LOCKS
1333                                 elog(NOTICE, "LockReleaseAll: cannot remove lock from HTAB");
1334 #else
1335                                 elog(NOTICE, "LockReplace: cannot remove lock from HTAB");
1336 #endif
1337                                 return (FALSE);
1338                         }
1339                 }
1340                 else
1341                 {
1342                         /* --------------------
1343                          * Wake the first waiting process and grant him the lock if it
1344                          * doesn't conflict.  The woken process must record the lock
1345                          * him/herself.
1346                          * --------------------
1347                          */
1348                         waitQueue = &(lock->waitProcs);
1349                         ProcLockWakeup(waitQueue, (char *) ltable, (char *) lock);
1350                 }
1351
1352 #ifdef USER_LOCKS
1353 next_item:
1354 #endif
1355                 if (done)
1356                         break;
1357                 SHMQueueFirst(&xidLook->queue, (Pointer *) &tmp, &tmp->queue);
1358                 xidLook = tmp;
1359         }
1360         SpinRelease(masterLock);
1361 #ifdef USER_LOCKS
1362
1363         /*
1364          * Reinitialize the queue only if nothing has been left in.
1365          */
1366         if (nskip == 0)
1367 #endif
1368                 SHMQueueInit(lockQueue);
1369         return TRUE;
1370 }
1371
1372 int
1373 LockShmemSize()
1374 {
1375         int                     size = 0;
1376         int                     nLockBuckets,
1377                                 nLockSegs;
1378         int                     nXidBuckets,
1379                                 nXidSegs;
1380
1381         nLockBuckets = 1 << (int) my_log2((NLOCKENTS - 1) / DEF_FFACTOR + 1);
1382         nLockSegs = 1 << (int) my_log2((nLockBuckets - 1) / DEF_SEGSIZE + 1);
1383
1384         nXidBuckets = 1 << (int) my_log2((NLOCKS_PER_XACT - 1) / DEF_FFACTOR + 1);
1385         nXidSegs = 1 << (int) my_log2((nLockBuckets - 1) / DEF_SEGSIZE + 1);
1386
1387         size += MAXALIGN(NBACKENDS * sizeof(PROC)); /* each MyProc */
1388         size += MAXALIGN(NBACKENDS * sizeof(LOCKCTL));          /* each ltable->ctl */
1389         size += MAXALIGN(sizeof(PROC_HDR)); /* ProcGlobal */
1390
1391         size += MAXALIGN(my_log2(NLOCKENTS) * sizeof(void *));
1392         size += MAXALIGN(sizeof(HHDR));
1393         size += nLockSegs * MAXALIGN(DEF_SEGSIZE * sizeof(SEGMENT));
1394         size += NLOCKENTS *                     /* XXX not multiple of BUCKET_ALLOC_INCR? */
1395                 (MAXALIGN(sizeof(BUCKET_INDEX)) +
1396                  MAXALIGN(sizeof(LOCK)));               /* contains hash key */
1397
1398         size += MAXALIGN(my_log2(NBACKENDS) * sizeof(void *));
1399         size += MAXALIGN(sizeof(HHDR));
1400         size += nXidSegs * MAXALIGN(DEF_SEGSIZE * sizeof(SEGMENT));
1401         size += NBACKENDS *                     /* XXX not multiple of BUCKET_ALLOC_INCR? */
1402                 (MAXALIGN(sizeof(BUCKET_INDEX)) +
1403                  MAXALIGN(sizeof(XIDLookupEnt)));               /* contains hash key */
1404
1405         return size;
1406 }
1407
1408 /* -----------------
1409  * Boolean function to determine current locking status
1410  * -----------------
1411  */
1412 bool
1413 LockingDisabled()
1414 {
1415         return LockingIsDisabled;
1416 }
1417
1418 #ifdef DEADLOCK_DEBUG
1419 /*
1420  * Dump all locks. Must have already acquired the masterLock.
1421  */
1422 void
1423 DumpLocks()
1424 {
1425         SHMEM_OFFSET location;
1426         PROC       *proc;
1427         SHM_QUEUE  *lockQueue;
1428         int                     done;
1429         XIDLookupEnt *xidLook = NULL;
1430         XIDLookupEnt *tmp = NULL;
1431         SHMEM_OFFSET end;
1432         SPINLOCK        masterLock;
1433         int                     nLockTypes;
1434         LOCK       *lock;
1435                                 count;
1436         int                     tableId = 1;
1437         LOCKTAB    *ltable;
1438
1439         ShmemPIDLookup(MyProcPid, &location);
1440         if (location == INVALID_OFFSET)
1441                 return;
1442         proc = (PROC *) MAKE_PTR(location);
1443         if (proc != MyProc)
1444                 return;
1445         lockQueue = &proc->lockQueue;
1446
1447         Assert(tableId < NumTables);
1448         ltable = AllTables[tableId];
1449         if (!ltable)
1450                 return;
1451
1452         nLockTypes = ltable->ctl->nLockTypes;
1453         masterLock = ltable->ctl->masterLock;
1454
1455         if (SHMQueueEmpty(lockQueue))
1456                 return;
1457
1458         SHMQueueFirst(lockQueue, (Pointer *) &xidLook, &xidLook->queue);
1459         end = MAKE_OFFSET(lockQueue);
1460
1461         LOCK_DUMP("DumpLocks", MyProc->waitLock, 0);
1462         XID_PRINT("DumpLocks", xidLook);
1463
1464         for (count = 0;;)
1465         {
1466                 /* ---------------------------
1467                  * XXX Here we assume the shared memory queue is circular and
1468                  * that we know its internal structure.  Should have some sort of
1469                  * macros to allow one to walk it.      mer 20 July 1991
1470                  * ---------------------------
1471                  */
1472                 done = (xidLook->queue.next == end);
1473                 lock = (LOCK *) MAKE_PTR(xidLook->tag.lock);
1474
1475                 LOCK_DUMP("DumpLocks", lock, 0);
1476
1477                 if (count++ > 2000)
1478                 {
1479                         elog(NOTICE, "DumpLocks: xid loop detected, giving up");
1480                         break;
1481                 }
1482
1483                 if (done)
1484                         break;
1485                 SHMQueueFirst(&xidLook->queue, (Pointer *) &tmp, &tmp->queue);
1486                 xidLook = tmp;
1487         }
1488 }
1489
1490 #endif