]> granicus.if.org Git - postgresql/blob - src/backend/storage/lmgr/lock.c
Rename locking structure names to be clearer. Add narrative to
[postgresql] / src / backend / storage / lmgr / lock.c
1 /*-------------------------------------------------------------------------
2  *
3  * lock.c--
4  *        simple lock acquisition
5  *
6  * Copyright (c) 1994, Regents of the University of California
7  *
8  *
9  * IDENTIFICATION
10  *        $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.32 1998/06/30 02:33:31 momjian Exp $
11  *
12  * NOTES
13  *        Outside modules can create a lock table and acquire/release
14  *        locks.  A lock table is a shared memory hash table.  When
15  *        a process tries to acquire a lock of a type that conflictRs
16  *        with existing locks, it is put to sleep using the routines
17  *        in storage/lmgr/proc.c.
18  *
19  *      Interface:
20  *
21  *      LockAcquire(), LockRelease(), LockMethodTableInit().
22  *
23  *      LockReplace() is called only within this module and by the
24  *              lkchain module.  It releases a lock without looking
25  *              the lock up in the lock table.
26  *
27  *      NOTE: This module is used to define new lock tables.  The
28  *              multi-level lock table (multi.c) used by the heap
29  *              access methods calls these routines.  See multi.c for
30  *              examples showing how to use this interface.
31  *
32  *-------------------------------------------------------------------------
33  */
34 #include <stdio.h>                              /* for sprintf() */
35 #include <string.h>
36 #include <sys/types.h>
37 #include <unistd.h>
38
39 #include "postgres.h"
40 #include "miscadmin.h"
41 #include "storage/shmem.h"
42 #include "storage/sinvaladt.h"
43 #include "storage/spin.h"
44 #include "storage/proc.h"
45 #include "storage/lock.h"
46 #include "utils/dynahash.h"
47 #include "utils/hsearch.h"
48 #include "utils/memutils.h"
49 #include "utils/palloc.h"
50 #include "access/xact.h"
51 #include "access/transam.h"
52
53 static int
54 WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode);
55
56 /*#define LOCK_MGR_DEBUG*/
57
58 #ifndef LOCK_MGR_DEBUG
59
60 #define LOCK_PRINT(where,tag,type)
61 #define LOCK_DUMP(where,lock,type)
62 #define LOCK_DUMP_AUX(where,lock,type)
63 #define XID_PRINT(where,xidentP)
64
65 #else                                                   /* LOCK_MGR_DEBUG */
66
67 int                     lockDebug = 0;
68 unsigned int lock_debug_oid_min = BootstrapObjectIdData;
69 static char *lock_types[] = {
70         "NONE",
71         "WRITE",
72         "READ",
73         "WRITE INTENT",
74         "READ INTENT",
75         "EXTEND"
76 };
77
78 #define LOCK_PRINT(where,tag,type)\
79         if ((lockDebug >= 1) && (tag->relId >= lock_debug_oid_min)) \
80                 elog(DEBUG, \
81                          "%s: pid (%d) rel (%d) dbid (%d) tid (%d,%d) type (%s)",where, \
82                          MyProcPid,\
83                          tag->relId, tag->dbId, \
84                          ((tag->tupleId.ip_blkid.bi_hi<<16)+\
85                           tag->tupleId.ip_blkid.bi_lo),\
86                          tag->tupleId.ip_posid, \
87                          lock_types[type])
88
89 #define LOCK_DUMP(where,lock,type)\
90         if ((lockDebug >= 1) && (lock->tag.relId >= lock_debug_oid_min)) \
91                 LOCK_DUMP_AUX(where,lock,type)
92
93 #define LOCK_DUMP_AUX(where,lock,type)\
94                 elog(DEBUG, \
95                          "%s: pid (%d) rel (%d) dbid (%d) tid (%d,%d) nHolding (%d) "\
96                          "holders (%d,%d,%d,%d,%d) type (%s)",where, \
97                          MyProcPid,\
98                          lock->tag.relId, lock->tag.dbId, \
99                          ((lock->tag.tupleId.ip_blkid.bi_hi<<16)+\
100                           lock->tag.tupleId.ip_blkid.bi_lo),\
101                          lock->tag.tupleId.ip_posid, \
102                          lock->nHolding,\
103                          lock->holders[1],\
104                          lock->holders[2],\
105                          lock->holders[3],\
106                          lock->holders[4],\
107                          lock->holders[5],\
108                          lock_types[type])
109
110 #define XID_PRINT(where,xidentP)\
111         if ((lockDebug >= 2) && \
112                 (((LOCK *)MAKE_PTR(xidentP->tag.lock))->tag.relId \
113                  >= lock_debug_oid_min)) \
114                 elog(DEBUG,\
115                          "%s: pid (%d) xid (%d) pid (%d) lock (%x) nHolding (%d) "\
116                          "holders (%d,%d,%d,%d,%d)",\
117                          where,\
118                          MyProcPid,\
119                          xidentP->tag.xid,\
120                          xidentP->tag.pid,\
121                          xidentP->tag.lock,\
122                          xidentP->nHolding,\
123                          xidentP->holders[1],\
124                          xidentP->holders[2],\
125                          xidentP->holders[3],\
126                          xidentP->holders[4],\
127                          xidentP->holders[5])
128
129 #endif                                                  /* LOCK_MGR_DEBUG */
130
131 SPINLOCK        LockMgrLock;            /* in Shmem or created in
132                                                                  * CreateSpinlocks() */
133
134 /* This is to simplify/speed up some bit arithmetic */
135
136 static MASK BITS_OFF[MAX_LOCKMODES];
137 static MASK BITS_ON[MAX_LOCKMODES];
138
139 /* -----------------
140  * XXX Want to move this to this file
141  * -----------------
142  */
143 static bool LockingIsDisabled;
144
145 /* -------------------
146  * map from lockmethod to the lock table structure
147  * -------------------
148  */
149 static LOCKMETHODTABLE *LockMethodTable[MAX_LOCK_METHODS];
150
151 static int      NumLockMethods;
152
153 /* -------------------
154  * InitLocks -- Init the lock module.  Create a private data
155  *              structure for constructing conflict masks.
156  * -------------------
157  */
158 void
159 InitLocks()
160 {
161         int                     i;
162         int                     bit;
163
164         bit = 1;
165         /* -------------------
166          * remember 0th lockmode is invalid
167          * -------------------
168          */
169         for (i = 0; i < MAX_LOCKMODES; i++, bit <<= 1)
170         {
171                 BITS_ON[i] = bit;
172                 BITS_OFF[i] = ~bit;
173         }
174 }
175
176 /* -------------------
177  * LockDisable -- sets LockingIsDisabled flag to TRUE or FALSE.
178  * ------------------
179  */
180 void
181 LockDisable(int status)
182 {
183         LockingIsDisabled = status;
184 }
185
186
187 /*
188  * LockMethodInit -- initialize the lock table's lock type
189  *              structures
190  *
191  * Notes: just copying.  Should only be called once.
192  */
193 static void
194 LockMethodInit(LOCKMETHODTABLE *lockMethodTable,
195                          MASK *conflictsP,
196                          int *prioP,
197                          int numModes)
198 {
199         int                     i;
200
201         lockMethodTable->ctl->numLockModes = numModes;
202         numModes++;
203         for (i = 0; i < numModes; i++, prioP++, conflictsP++)
204         {
205                 lockMethodTable->ctl->conflictTab[i] = *conflictsP;
206                 lockMethodTable->ctl->prio[i] = *prioP;
207         }
208 }
209
210 /*
211  * LockMethodTableInit -- initialize a lock table structure
212  *
213  * Notes:
214  *              (a) a lock table has four separate entries in the shmem index
215  *              table.  This is because every shared hash table and spinlock
216  *              has its name stored in the shmem index at its creation.  It
217  *              is wasteful, in this case, but not much space is involved.
218  *
219  */
220 LOCKMETHOD
221 LockMethodTableInit(char *tabName,
222                         MASK *conflictsP,
223                         int *prioP,
224                         int numModes)
225 {
226         LOCKMETHODTABLE    *lockMethodTable;
227         char       *shmemName;
228         HASHCTL         info;
229         int                     hash_flags;
230         bool            found;
231         int                     status = TRUE;
232
233         if (numModes > MAX_LOCKMODES)
234         {
235                 elog(NOTICE, "LockMethodTableInit: too many lock types %d greater than %d",
236                          numModes, MAX_LOCKMODES);
237                 return (INVALID_TABLEID);
238         }
239
240         /* allocate a string for the shmem index table lookup */
241         shmemName = (char *) palloc((unsigned) (strlen(tabName) + 32));
242         if (!shmemName)
243         {
244                 elog(NOTICE, "LockMethodTableInit: couldn't malloc string %s \n", tabName);
245                 return (INVALID_TABLEID);
246         }
247
248         /* each lock table has a non-shared header */
249         lockMethodTable = (LOCKMETHODTABLE *) palloc((unsigned) sizeof(LOCKMETHODTABLE));
250         if (!lockMethodTable)
251         {
252                 elog(NOTICE, "LockMethodTableInit: couldn't malloc lock table %s\n", tabName);
253                 pfree(shmemName);
254                 return (INVALID_TABLEID);
255         }
256
257         /* ------------------------
258          * find/acquire the spinlock for the table
259          * ------------------------
260          */
261         SpinAcquire(LockMgrLock);
262
263
264         /* -----------------------
265          * allocate a control structure from shared memory or attach to it
266          * if it already exists.
267          * -----------------------
268          */
269         sprintf(shmemName, "%s (ctl)", tabName);
270         lockMethodTable->ctl = (LOCKMETHODCTL *)
271                 ShmemInitStruct(shmemName, (unsigned) sizeof(LOCKMETHODCTL), &found);
272
273         if (!lockMethodTable->ctl)
274         {
275                 elog(FATAL, "LockMethodTableInit: couldn't initialize %s", tabName);
276                 status = FALSE;
277         }
278
279         /* -------------------
280          * no zero-th table
281          * -------------------
282          */
283         NumLockMethods = 1;
284
285         /* ----------------
286          * we're first - initialize
287          * ----------------
288          */
289         if (!found)
290         {
291                 MemSet(lockMethodTable->ctl, 0, sizeof(LOCKMETHODCTL));
292                 lockMethodTable->ctl->masterLock = LockMgrLock;
293                 lockMethodTable->ctl->lockmethod = NumLockMethods;
294         }
295
296         /* --------------------
297          * other modules refer to the lock table by a lockmethod
298          * --------------------
299          */
300         LockMethodTable[NumLockMethods] = lockMethodTable;
301         NumLockMethods++;
302         Assert(NumLockMethods <= MAX_LOCK_METHODS);
303
304         /* ----------------------
305          * allocate a hash table for the lock tags.  This is used
306          * to find the different locks.
307          * ----------------------
308          */
309         info.keysize = sizeof(LOCKTAG);
310         info.datasize = sizeof(LOCK);
311         info.hash = tag_hash;
312         hash_flags = (HASH_ELEM | HASH_FUNCTION);
313
314         sprintf(shmemName, "%s (lock hash)", tabName);
315         lockMethodTable->lockHash = (HTAB *) ShmemInitHash(shmemName,
316                                                                                  INIT_TABLE_SIZE, MAX_TABLE_SIZE,
317                                                                                           &info, hash_flags);
318
319         Assert(lockMethodTable->lockHash->hash == tag_hash);
320         if (!lockMethodTable->lockHash)
321         {
322                 elog(FATAL, "LockMethodTableInit: couldn't initialize %s", tabName);
323                 status = FALSE;
324         }
325
326         /* -------------------------
327          * allocate an xid table.  When different transactions hold
328          * the same lock, additional information must be saved (locks per tx).
329          * -------------------------
330          */
331         info.keysize = XID_TAGSIZE;
332         info.datasize = sizeof(XIDLookupEnt);
333         info.hash = tag_hash;
334         hash_flags = (HASH_ELEM | HASH_FUNCTION);
335
336         sprintf(shmemName, "%s (xid hash)", tabName);
337         lockMethodTable->xidHash = (HTAB *) ShmemInitHash(shmemName,
338                                                                                  INIT_TABLE_SIZE, MAX_TABLE_SIZE,
339                                                                                          &info, hash_flags);
340
341         if (!lockMethodTable->xidHash)
342         {
343                 elog(FATAL, "LockMethodTableInit: couldn't initialize %s", tabName);
344                 status = FALSE;
345         }
346
347         /* init ctl data structures */
348         LockMethodInit(lockMethodTable, conflictsP, prioP, numModes);
349
350         SpinRelease(LockMgrLock);
351
352         pfree(shmemName);
353
354         if (status)
355                 return (lockMethodTable->ctl->lockmethod);
356         else
357                 return (INVALID_TABLEID);
358 }
359
360 /*
361  * LockMethodTableRename -- allocate another lockmethod to the same
362  *              lock table.
363  *
364  * NOTES: Both the lock module and the lock chain (lchain.c)
365  *              module use table id's to distinguish between different
366  *              kinds of locks.  Short term and long term locks look
367  *              the same to the lock table, but are handled differently
368  *              by the lock chain manager.      This function allows the
369  *              client to use different lockmethods when acquiring/releasing
370  *              short term and long term locks.
371  */
372 #ifdef NOT_USED
373 LOCKMETHOD
374 LockMethodTableRename(LOCKMETHOD lockmethod)
375 {
376         LOCKMETHOD newLockMethod;
377
378         if (NumLockMethods >= MAX_LOCK_METHODS)
379                 return (INVALID_TABLEID);
380         if (LockMethodTable[lockmethod] == INVALID_TABLEID)
381                 return (INVALID_TABLEID);
382
383         /* other modules refer to the lock table by a lockmethod */
384         newLockMethod = NumLockMethods;
385         NumLockMethods++;
386
387         LockMethodTable[newLockMethod] = LockMethodTable[lockmethod];
388         return (newLockMethod);
389 }
390 #endif
391
392 /*
393  * LockAcquire -- Check for lock conflicts, sleep if conflict found,
394  *              set lock if/when no conflicts.
395  *
396  * Returns: TRUE if parameters are correct, FALSE otherwise.
397  *
398  * Side Effects: The lock is always acquired.  No way to abort
399  *              a lock acquisition other than aborting the transaction.
400  *              Lock is recorded in the lkchain.
401 #ifdef USER_LOCKS
402  * Note on User Locks:
403  *              User locks are handled totally on the application side as
404  *              long term cooperative locks which extend beyond the normal
405  *              transaction boundaries.  Their purpose is to indicate to an
406  *              application that someone is `working' on an item.  So it is
407  *              possible to put an user lock on a tuple's oid, retrieve the
408  *              tuple, work on it for an hour and then update it and remove
409  *              the lock.  While the lock is active other clients can still
410  *              read and write the tuple but they can be aware that it has
411  *              been locked at the application level by someone.
412  *              User locks use lock tags made of an uint16 and an uint32, for
413  *              example 0 and a tuple oid, or any other arbitrary pair of
414  *              numbers following a convention established by the application.
415  *              In this sense tags don't refer to tuples or database entities.
416  *              User locks and normal locks are completely orthogonal and
417  *              they don't interfere with each other, so it is possible
418  *              to acquire a normal lock on an user-locked tuple or user-lock
419  *              a tuple for which a normal write lock already exists.
420  *              User locks are always non blocking, therefore they are never
421  *              acquired if already held by another process.  They must be
422  *              released explicitly by the application but they are released
423  *              automatically when a backend terminates.
424  *              They are indicated by a dummy lockmethod 0 which doesn't have
425  *              any table allocated but uses the normal lock table, and are
426  *              distinguished from normal locks for the following differences:
427  *
428  *                                                                              normal lock             user lock
429  *
430  *              lockmethod                                              1                               0
431  *              tag.relId                                               rel oid                 0
432  *              tag.ItemPointerData.ip_blkid    block id                lock id2
433  *              tag.ItemPointerData.ip_posid    tuple offset    lock id1
434  *              xid.pid                                                 0                               backend pid
435  *              xid.xid                                                 current xid             0
436  *              persistence                                             transaction             user or backend
437  *
438  *              The lockmode parameter can have the same values for normal locks
439  *              although probably only WRITE_LOCK can have some practical use.
440  *
441  *                                                                                                              DZ - 4 Oct 1996
442 #endif
443  */
444
445 bool
446 LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
447 {
448         XIDLookupEnt *result,
449                                 item;
450         HTAB       *xidTable;
451         bool            found;
452         LOCK       *lock = NULL;
453         SPINLOCK        masterLock;
454         LOCKMETHODTABLE    *lockMethodTable;
455         int                     status;
456         TransactionId myXid;
457
458 #ifdef USER_LOCKS
459         int                     is_user_lock;
460
461         is_user_lock = (lockmethod == 0);
462         if (is_user_lock)
463         {
464                 lockmethod = 1;
465 #ifdef USER_LOCKS_DEBUG
466                 elog(NOTICE, "LockAcquire: user lock tag [%u,%u] %d",
467                          locktag->tupleId.ip_posid,
468                          ((locktag->tupleId.ip_blkid.bi_hi << 16) +
469                           locktag->tupleId.ip_blkid.bi_lo),
470                          lockmode);
471 #endif
472         }
473 #endif
474
475         Assert(lockmethod < NumLockMethods);
476         lockMethodTable = LockMethodTable[lockmethod];
477         if (!lockMethodTable)
478         {
479                 elog(NOTICE, "LockAcquire: bad lock table %d", lockmethod);
480                 return (FALSE);
481         }
482
483         if (LockingIsDisabled)
484                 return (TRUE);
485
486         LOCK_PRINT("Acquire", locktag, lockmode);
487         masterLock = lockMethodTable->ctl->masterLock;
488
489         SpinAcquire(masterLock);
490
491         Assert(lockMethodTable->lockHash->hash == tag_hash);
492         lock = (LOCK *) hash_search(lockMethodTable->lockHash, (Pointer) locktag, HASH_ENTER, &found);
493
494         if (!lock)
495         {
496                 SpinRelease(masterLock);
497                 elog(FATAL, "LockAcquire: lock table %d is corrupted", lockmethod);
498                 return (FALSE);
499         }
500
501         /* --------------------
502          * if there was nothing else there, complete initialization
503          * --------------------
504          */
505         if (!found)
506         {
507                 lock->mask = 0;
508                 ProcQueueInit(&(lock->waitProcs));
509                 MemSet((char *) lock->holders, 0, sizeof(int) * MAX_LOCKMODES);
510                 MemSet((char *) lock->activeHolders, 0, sizeof(int) * MAX_LOCKMODES);
511                 lock->nHolding = 0;
512                 lock->nActive = 0;
513
514                 Assert(BlockIdEquals(&(lock->tag.tupleId.ip_blkid),
515                                                          &(locktag->tupleId.ip_blkid)));
516
517         }
518
519         /* ------------------
520          * add an element to the lock queue so that we can clear the
521          * locks at end of transaction.
522          * ------------------
523          */
524         xidTable = lockMethodTable->xidHash;
525         myXid = GetCurrentTransactionId();
526
527         /* ------------------
528          * Zero out all of the tag bytes (this clears the padding bytes for long
529          * word alignment and ensures hashing consistency).
530          * ------------------
531          */
532         MemSet(&item, 0, XID_TAGSIZE);          /* must clear padding, needed */
533         TransactionIdStore(myXid, &item.tag.xid);
534         item.tag.lock = MAKE_OFFSET(lock);
535 #if 0
536         item.tag.pid = MyPid;
537 #endif
538
539 #ifdef USER_LOCKS
540         if (is_user_lock)
541         {
542                 item.tag.pid = MyProcPid;
543                 item.tag.xid = myXid = 0;
544 #ifdef USER_LOCKS_DEBUG
545                 elog(NOTICE, "LockAcquire: user lock xid [%d,%d,%d]",
546                          item.tag.lock, item.tag.pid, item.tag.xid);
547 #endif
548         }
549 #endif
550
551         result = (XIDLookupEnt *) hash_search(xidTable, (Pointer) &item, HASH_ENTER, &found);
552         if (!result)
553         {
554                 elog(NOTICE, "LockAcquire: xid table corrupted");
555                 return (STATUS_ERROR);
556         }
557         if (!found)
558         {
559                 XID_PRINT("LockAcquire: queueing XidEnt", result);
560                 ProcAddLock(&result->queue);
561                 result->nHolding = 0;
562                 MemSet((char *) result->holders, 0, sizeof(int) * MAX_LOCKMODES);
563         }
564
565         /* ----------------
566          * lock->nholding tells us how many processes have _tried_ to
567          * acquire this lock,  Regardless of whether they succeeded or
568          * failed in doing so.
569          * ----------------
570          */
571         lock->nHolding++;
572         lock->holders[lockmode]++;
573
574         /* --------------------
575          * If I'm the only one holding a lock, then there
576          * cannot be a conflict.  Need to subtract one from the
577          * lock's count since we just bumped the count up by 1
578          * above.
579          * --------------------
580          */
581         if (result->nHolding == lock->nActive)
582         {
583                 result->holders[lockmode]++;
584                 result->nHolding++;
585                 GrantLock(lock, lockmode);
586                 SpinRelease(masterLock);
587                 return (TRUE);
588         }
589
590         Assert(result->nHolding <= lock->nActive);
591
592         status = LockResolveConflicts(lockmethod, lock, lockmode, myXid);
593
594         if (status == STATUS_OK)
595                 GrantLock(lock, lockmode);
596         else if (status == STATUS_FOUND)
597         {
598 #ifdef USER_LOCKS
599
600                 /*
601                  * User locks are non blocking. If we can't acquire a lock remove
602                  * the xid entry and return FALSE without waiting.
603                  */
604                 if (is_user_lock)
605                 {
606                         if (!result->nHolding)
607                         {
608                                 SHMQueueDelete(&result->queue);
609                                 hash_search(xidTable, (Pointer) &item, HASH_REMOVE, &found);
610                         }
611                         lock->nHolding--;
612                         lock->holders[lockmode]--;
613                         SpinRelease(masterLock);
614 #ifdef USER_LOCKS_DEBUG
615                         elog(NOTICE, "LockAcquire: user lock failed");
616 #endif
617                         return (FALSE);
618                 }
619 #endif
620                 status = WaitOnLock(lockmethod, lock, lockmode);
621                 XID_PRINT("Someone granted me the lock", result);
622         }
623
624         SpinRelease(masterLock);
625
626         return (status == STATUS_OK);
627 }
628
629 /* ----------------------------
630  * LockResolveConflicts -- test for lock conflicts
631  *
632  * NOTES:
633  *              Here's what makes this complicated: one transaction's
634  * locks don't conflict with one another.  When many processes
635  * hold locks, each has to subtract off the other's locks when
636  * determining whether or not any new lock acquired conflicts with
637  * the old ones.
638  *
639  *      For example, if I am already holding a WRITE_INTENT lock,
640  *      there will not be a conflict with my own READ_LOCK.  If I
641  *      don't consider the intent lock when checking for conflicts,
642  *      I find no conflict.
643  * ----------------------------
644  */
645 int
646 LockResolveConflicts(LOCKMETHOD lockmethod,
647                                          LOCK *lock,
648                                          LOCKMODE lockmode,
649                                          TransactionId xid)
650 {
651         XIDLookupEnt *result,
652                                 item;
653         int                *myHolders;
654         int                     numLockModes;
655         HTAB       *xidTable;
656         bool            found;
657         int                     bitmask;
658         int                     i,
659                                 tmpMask;
660
661         numLockModes = LockMethodTable[lockmethod]->ctl->numLockModes;
662         xidTable = LockMethodTable[lockmethod]->xidHash;
663
664         /* ---------------------
665          * read my own statistics from the xid table.  If there
666          * isn't an entry, then we'll just add one.
667          *
668          * Zero out the tag, this clears the padding bytes for long
669          * word alignment and ensures hashing consistency.
670          * ------------------
671          */
672         MemSet(&item, 0, XID_TAGSIZE);
673         TransactionIdStore(xid, &item.tag.xid);
674         item.tag.lock = MAKE_OFFSET(lock);
675 #if 0
676         item.tag.pid = pid;
677 #endif
678
679         if (!(result = (XIDLookupEnt *)
680                   hash_search(xidTable, (Pointer) &item, HASH_ENTER, &found)))
681         {
682                 elog(NOTICE, "LockResolveConflicts: xid table corrupted");
683                 return (STATUS_ERROR);
684         }
685         myHolders = result->holders;
686
687         if (!found)
688         {
689                 /* ---------------
690                  * we're not holding any type of lock yet.  Clear
691                  * the lock stats.
692                  * ---------------
693                  */
694                 MemSet(result->holders, 0, numLockModes * sizeof(*(lock->holders)));
695                 result->nHolding = 0;
696         }
697
698         {
699                 /* ------------------------
700                  * If someone with a greater priority is waiting for the lock,
701                  * do not continue and share the lock, even if we can.  bjm
702                  * ------------------------
703                  */
704                 int                     myprio = LockMethodTable[lockmethod]->ctl->prio[lockmode];
705                 PROC_QUEUE *waitQueue = &(lock->waitProcs);
706                 PROC       *topproc = (PROC *) MAKE_PTR(waitQueue->links.prev);
707
708                 if (waitQueue->size && topproc->prio > myprio)
709                         return STATUS_FOUND;
710         }
711
712         /* ----------------------------
713          * first check for global conflicts: If no locks conflict
714          * with mine, then I get the lock.
715          *
716          * Checking for conflict: lock->mask represents the types of
717          * currently held locks.  conflictTable[lockmode] has a bit
718          * set for each type of lock that conflicts with mine.  Bitwise
719          * compare tells if there is a conflict.
720          * ----------------------------
721          */
722         if (!(LockMethodTable[lockmethod]->ctl->conflictTab[lockmode] & lock->mask))
723         {
724
725                 result->holders[lockmode]++;
726                 result->nHolding++;
727
728                 XID_PRINT("Conflict Resolved: updated xid entry stats", result);
729
730                 return (STATUS_OK);
731         }
732
733         /* ------------------------
734          * Rats.  Something conflicts. But it could still be my own
735          * lock.  We have to construct a conflict mask
736          * that does not reflect our own locks.
737          * ------------------------
738          */
739         bitmask = 0;
740         tmpMask = 2;
741         for (i = 1; i <= numLockModes; i++, tmpMask <<= 1)
742         {
743                 if (lock->activeHolders[i] != myHolders[i])
744                         bitmask |= tmpMask;
745         }
746
747         /* ------------------------
748          * now check again for conflicts.  'bitmask' describes the types
749          * of locks held by other processes.  If one of these
750          * conflicts with the kind of lock that I want, there is a
751          * conflict and I have to sleep.
752          * ------------------------
753          */
754         if (!(LockMethodTable[lockmethod]->ctl->conflictTab[lockmode] & bitmask))
755         {
756
757                 /* no conflict. Get the lock and go on */
758
759                 result->holders[lockmode]++;
760                 result->nHolding++;
761
762                 XID_PRINT("Conflict Resolved: updated xid entry stats", result);
763
764                 return (STATUS_OK);
765
766         }
767
768         return (STATUS_FOUND);
769 }
770
771 static int
772 WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode)
773 {
774         PROC_QUEUE *waitQueue = &(lock->waitProcs);
775         LOCKMETHODTABLE    *lockMethodTable = LockMethodTable[lockmethod];
776         int                     prio = lockMethodTable->ctl->prio[lockmode];
777
778         Assert(lockmethod < NumLockMethods);
779
780         /*
781          * the waitqueue is ordered by priority. I insert myself according to
782          * the priority of the lock I am acquiring.
783          *
784          * SYNC NOTE: I am assuming that the lock table spinlock is sufficient
785          * synchronization for this queue.      That will not be true if/when
786          * people can be deleted from the queue by a SIGINT or something.
787          */
788         LOCK_DUMP_AUX("WaitOnLock: sleeping on lock", lock, lockmode);
789         if (ProcSleep(waitQueue,
790                                   lockMethodTable->ctl->masterLock,
791                                   lockmode,
792                                   prio,
793                                   lock) != NO_ERROR)
794         {
795                 /* -------------------
796                  * This could have happend as a result of a deadlock, see HandleDeadLock()
797                  * Decrement the lock nHolding and holders fields as we are no longer
798                  * waiting on this lock.
799                  * -------------------
800                  */
801                 lock->nHolding--;
802                 lock->holders[lockmode]--;
803                 LOCK_DUMP_AUX("WaitOnLock: aborting on lock", lock, lockmode);
804                 SpinRelease(lockMethodTable->ctl->masterLock);
805                 elog(ERROR, "WaitOnLock: error on wakeup - Aborting this transaction");
806         }
807
808         LOCK_DUMP_AUX("WaitOnLock: wakeup on lock", lock, lockmode);
809         return (STATUS_OK);
810 }
811
812 /*
813  * LockRelease -- look up 'locktag' in lock table 'lockmethod' and
814  *              release it.
815  *
816  * Side Effects: if the lock no longer conflicts with the highest
817  *              priority waiting process, that process is granted the lock
818  *              and awoken. (We have to grant the lock here to avoid a
819  *              race between the waking process and any new process to
820  *              come along and request the lock).
821  */
822 bool
823 LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
824 {
825         LOCK       *lock = NULL;
826         SPINLOCK        masterLock;
827         bool            found;
828         LOCKMETHODTABLE    *lockMethodTable;
829         XIDLookupEnt *result,
830                                 item;
831         HTAB       *xidTable;
832         bool            wakeupNeeded = true;
833
834 #ifdef USER_LOCKS
835         int                     is_user_lock;
836
837         is_user_lock = (lockmethod == 0);
838         if (is_user_lock)
839         {
840                 lockmethod = 1;
841 #ifdef USER_LOCKS_DEBUG
842                 elog(NOTICE, "LockRelease: user lock tag [%u,%u] %d",
843                          locktag->tupleId.ip_posid,
844                          ((locktag->tupleId.ip_blkid.bi_hi << 16) +
845                           locktag->tupleId.ip_blkid.bi_lo),
846                          lockmode);
847 #endif
848         }
849 #endif
850
851         Assert(lockmethod < NumLockMethods);
852         lockMethodTable = LockMethodTable[lockmethod];
853         if (!lockMethodTable)
854         {
855                 elog(NOTICE, "lockMethodTable is null in LockRelease");
856                 return (FALSE);
857         }
858
859         if (LockingIsDisabled)
860                 return (TRUE);
861
862         LOCK_PRINT("Release", locktag, lockmode);
863
864         masterLock = lockMethodTable->ctl->masterLock;
865         xidTable = lockMethodTable->xidHash;
866
867         SpinAcquire(masterLock);
868
869         Assert(lockMethodTable->lockHash->hash == tag_hash);
870         lock = (LOCK *)
871                 hash_search(lockMethodTable->lockHash, (Pointer) locktag, HASH_FIND_SAVE, &found);
872
873 #ifdef USER_LOCKS
874
875         /*
876          * If the entry is not found hash_search returns TRUE instead of NULL,
877          * so we must check it explicitly.
878          */
879         if ((is_user_lock) && (lock == (LOCK *) TRUE))
880         {
881                 SpinRelease(masterLock);
882                 elog(NOTICE, "LockRelease: there are no locks with this tag");
883                 return (FALSE);
884         }
885 #endif
886
887         /*
888          * let the caller print its own error message, too. Do not
889          * elog(ERROR).
890          */
891         if (!lock)
892         {
893                 SpinRelease(masterLock);
894                 elog(NOTICE, "LockRelease: locktable corrupted");
895                 return (FALSE);
896         }
897
898         if (!found)
899         {
900                 SpinRelease(masterLock);
901                 elog(NOTICE, "LockRelease: locktable lookup failed, no lock");
902                 return (FALSE);
903         }
904
905         Assert(lock->nHolding > 0);
906
907 #ifdef USER_LOCKS
908
909         /*
910          * If this is an user lock it can be removed only after checking that
911          * it was acquired by the current process, so this code is skipped and
912          * executed later.
913          */
914         if (!is_user_lock)
915         {
916 #endif
917
918                 /*
919                  * fix the general lock stats
920                  */
921                 lock->nHolding--;
922                 lock->holders[lockmode]--;
923                 lock->nActive--;
924                 lock->activeHolders[lockmode]--;
925
926                 Assert(lock->nActive >= 0);
927
928                 if (!lock->nHolding)
929                 {
930                         /* ------------------
931                          * if there's no one waiting in the queue,
932                          * we just released the last lock.
933                          * Delete it from the lock table.
934                          * ------------------
935                          */
936                         Assert(lockMethodTable->lockHash->hash == tag_hash);
937                         lock = (LOCK *) hash_search(lockMethodTable->lockHash,
938                                                                                 (Pointer) &(lock->tag),
939                                                                                 HASH_REMOVE_SAVED,
940                                                                                 &found);
941                         Assert(lock && found);
942                         wakeupNeeded = false;
943                 }
944 #ifdef USER_LOCKS
945         }
946 #endif
947
948         /* ------------------
949          * Zero out all of the tag bytes (this clears the padding bytes for long
950          * word alignment and ensures hashing consistency).
951          * ------------------
952          */
953         MemSet(&item, 0, XID_TAGSIZE);
954
955         TransactionIdStore(GetCurrentTransactionId(), &item.tag.xid);
956         item.tag.lock = MAKE_OFFSET(lock);
957 #if 0
958         item.tag.pid = MyPid;
959 #endif
960
961 #ifdef USER_LOCKS
962         if (is_user_lock)
963         {
964                 item.tag.pid = MyProcPid;
965                 item.tag.xid = 0;
966 #ifdef USER_LOCKS_DEBUG
967                 elog(NOTICE, "LockRelease: user lock xid [%d,%d,%d]",
968                          item.tag.lock, item.tag.pid, item.tag.xid);
969 #endif
970         }
971 #endif
972
973         if (!(result = (XIDLookupEnt *) hash_search(xidTable,
974                                                                                                 (Pointer) &item,
975                                                                                                 HASH_FIND_SAVE,
976                                                                                                 &found))
977                 || !found)
978         {
979                 SpinRelease(masterLock);
980 #ifdef USER_LOCKS
981                 if ((is_user_lock) && (result))
982                         elog(NOTICE, "LockRelease: you don't have a lock on this tag");
983                 else
984                         elog(NOTICE, "LockRelease: find xid, table corrupted");
985 #else
986                 elog(NOTICE, "LockReplace: xid table corrupted");
987 #endif
988                 return (FALSE);
989         }
990
991         /*
992          * now check to see if I have any private locks.  If I do, decrement
993          * the counts associated with them.
994          */
995         result->holders[lockmode]--;
996         result->nHolding--;
997
998         XID_PRINT("LockRelease updated xid stats", result);
999
1000         /*
1001          * If this was my last hold on this lock, delete my entry in the XID
1002          * table.
1003          */
1004         if (!result->nHolding)
1005         {
1006 #ifdef USER_LOCKS
1007                 if (result->queue.prev == INVALID_OFFSET)
1008                         elog(NOTICE, "LockRelease: xid.prev == INVALID_OFFSET");
1009                 if (result->queue.next == INVALID_OFFSET)
1010                         elog(NOTICE, "LockRelease: xid.next == INVALID_OFFSET");
1011 #endif
1012                 if (result->queue.next != INVALID_OFFSET)
1013                         SHMQueueDelete(&result->queue);
1014                 if (!(result = (XIDLookupEnt *)
1015                           hash_search(xidTable, (Pointer) &item, HASH_REMOVE_SAVED, &found)) ||
1016                         !found)
1017                 {
1018                         SpinRelease(masterLock);
1019 #ifdef USER_LOCKS
1020                         elog(NOTICE, "LockRelease: remove xid, table corrupted");
1021 #else
1022                         elog(NOTICE, "LockReplace: xid table corrupted");
1023 #endif
1024                         return (FALSE);
1025                 }
1026         }
1027
1028 #ifdef USER_LOCKS
1029
1030         /*
1031          * If this is an user lock remove it now, after the corresponding xid
1032          * entry has been found and deleted.
1033          */
1034         if (is_user_lock)
1035         {
1036
1037                 /*
1038                  * fix the general lock stats
1039                  */
1040                 lock->nHolding--;
1041                 lock->holders[lockmode]--;
1042                 lock->nActive--;
1043                 lock->activeHolders[lockmode]--;
1044
1045                 Assert(lock->nActive >= 0);
1046
1047                 if (!lock->nHolding)
1048                 {
1049                         /* ------------------
1050                          * if there's no one waiting in the queue,
1051                          * we just released the last lock.
1052                          * Delete it from the lock table.
1053                          * ------------------
1054                          */
1055                         Assert(lockMethodTable->lockHash->hash == tag_hash);
1056                         lock = (LOCK *) hash_search(lockMethodTable->lockHash,
1057                                                                                 (Pointer) &(lock->tag),
1058                                                                                 HASH_REMOVE,
1059                                                                                 &found);
1060                         Assert(lock && found);
1061                         wakeupNeeded = false;
1062                 }
1063         }
1064 #endif
1065
1066         /* --------------------------
1067          * If there are still active locks of the type I just released, no one
1068          * should be woken up.  Whoever is asleep will still conflict
1069          * with the remaining locks.
1070          * --------------------------
1071          */
1072         if (!(lock->activeHolders[lockmode]))
1073         {
1074                 /* change the conflict mask.  No more of this lock type. */
1075                 lock->mask &= BITS_OFF[lockmode];
1076         }
1077
1078         if (wakeupNeeded)
1079         {
1080                 /* --------------------------
1081                  * Wake the first waiting process and grant him the lock if it
1082                  * doesn't conflict.  The woken process must record the lock
1083                  * himself.
1084                  * --------------------------
1085                  */
1086                 ProcLockWakeup(&(lock->waitProcs), lockmethod, lock);
1087         }
1088
1089         SpinRelease(masterLock);
1090         return (TRUE);
1091 }
1092
1093 /*
1094  * GrantLock -- update the lock data structure to show
1095  *              the new lock holder.
1096  */
1097 void
1098 GrantLock(LOCK *lock, LOCKMODE lockmode)
1099 {
1100         lock->nActive++;
1101         lock->activeHolders[lockmode]++;
1102         lock->mask |= BITS_ON[lockmode];
1103 }
1104
1105 #ifdef USER_LOCKS
1106 /*
1107  * LockReleaseAll -- Release all locks in a process lock queue.
1108  *
1109  * Note: This code is a little complicated by the presence in the
1110  *               same queue of user locks which can't be removed from the
1111  *               normal lock queue at the end of a transaction. They must
1112  *               however be removed when the backend exits.
1113  *               A dummy lockmethod 0 is used to indicate that we are releasing
1114  *               the user locks, from the code added to ProcKill().
1115  */
1116 #endif
1117 bool
1118 LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue)
1119 {
1120         PROC_QUEUE *waitQueue;
1121         int                     done;
1122         XIDLookupEnt *xidLook = NULL;
1123         XIDLookupEnt *tmp = NULL;
1124         SHMEM_OFFSET end = MAKE_OFFSET(lockQueue);
1125         SPINLOCK        masterLock;
1126         LOCKMETHODTABLE    *lockMethodTable;
1127         int                     i,
1128                                 numLockModes;
1129         LOCK       *lock;
1130         bool            found;
1131
1132 #ifdef USER_LOCKS
1133         int                     is_user_lock_table,
1134                                 count,
1135                                 nskip;
1136
1137         is_user_lock_table = (lockmethod == 0);
1138 #ifdef USER_LOCKS_DEBUG
1139         elog(NOTICE, "LockReleaseAll: lockmethod=%d, pid=%d", lockmethod, MyProcPid);
1140 #endif
1141         if (is_user_lock_table)
1142                 lockmethod = 1;
1143 #endif
1144
1145         Assert(lockmethod < NumLockMethods);
1146         lockMethodTable = LockMethodTable[lockmethod];
1147         if (!lockMethodTable)
1148                 return (FALSE);
1149
1150         numLockModes = lockMethodTable->ctl->numLockModes;
1151         masterLock = lockMethodTable->ctl->masterLock;
1152
1153         if (SHMQueueEmpty(lockQueue))
1154                 return TRUE;
1155
1156 #ifdef USER_LOCKS
1157         SpinAcquire(masterLock);
1158 #endif
1159         SHMQueueFirst(lockQueue, (Pointer *) &xidLook, &xidLook->queue);
1160
1161         XID_PRINT("LockReleaseAll", xidLook);
1162
1163 #ifndef USER_LOCKS
1164         SpinAcquire(masterLock);
1165 #else
1166         count = nskip = 0;
1167 #endif
1168         for (;;)
1169         {
1170                 /* ---------------------------
1171                  * XXX Here we assume the shared memory queue is circular and
1172                  * that we know its internal structure.  Should have some sort of
1173                  * macros to allow one to walk it.      mer 20 July 1991
1174                  * ---------------------------
1175                  */
1176                 done = (xidLook->queue.next == end);
1177                 lock = (LOCK *) MAKE_PTR(xidLook->tag.lock);
1178
1179                 LOCK_PRINT("LockReleaseAll", (&lock->tag), 0);
1180
1181 #ifdef USER_LOCKS
1182
1183                 /*
1184                  * Sometimes the queue appears to be messed up.
1185                  */
1186                 if (count++ > 2000)
1187                 {
1188                         elog(NOTICE, "LockReleaseAll: xid loop detected, giving up");
1189                         nskip = 0;
1190                         break;
1191                 }
1192                 if (is_user_lock_table)
1193                 {
1194                         if ((xidLook->tag.pid == 0) || (xidLook->tag.xid != 0))
1195                         {
1196 #ifdef USER_LOCKS_DEBUG
1197                                 elog(NOTICE, "LockReleaseAll: skip normal lock [%d,%d,%d]",
1198                                   xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
1199 #endif
1200                                 nskip++;
1201                                 goto next_item;
1202                         }
1203                         if (xidLook->tag.pid != MyProcPid)
1204                         {
1205                                 /* This should never happen */
1206 #ifdef USER_LOCKS_DEBUG
1207                                 elog(NOTICE,
1208                                          "LockReleaseAll: skip other pid [%u,%u] [%d,%d,%d]",
1209                                          lock->tag.tupleId.ip_posid,
1210                                          ((lock->tag.tupleId.ip_blkid.bi_hi << 16) +
1211                                           lock->tag.tupleId.ip_blkid.bi_lo),
1212                                   xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
1213 #endif
1214                                 nskip++;
1215                                 goto next_item;
1216                         }
1217 #ifdef USER_LOCKS_DEBUG
1218                         elog(NOTICE,
1219                                  "LockReleaseAll: release user lock [%u,%u] [%d,%d,%d]",
1220                                  lock->tag.tupleId.ip_posid,
1221                                  ((lock->tag.tupleId.ip_blkid.bi_hi << 16) +
1222                                   lock->tag.tupleId.ip_blkid.bi_lo),
1223                                  xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
1224 #endif
1225                 }
1226                 else
1227                 {
1228                         if ((xidLook->tag.pid != 0) || (xidLook->tag.xid == 0))
1229                         {
1230 #ifdef USER_LOCKS_DEBUG
1231                                 elog(NOTICE,
1232                                          "LockReleaseAll: skip user lock [%u,%u] [%d,%d,%d]",
1233                                          lock->tag.tupleId.ip_posid,
1234                                          ((lock->tag.tupleId.ip_blkid.bi_hi << 16) +
1235                                           lock->tag.tupleId.ip_blkid.bi_lo),
1236                                   xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
1237 #endif
1238                                 nskip++;
1239                                 goto next_item;
1240                         }
1241 #ifdef USER_LOCKS_DEBUG
1242                         elog(NOTICE, "LockReleaseAll: release normal lock [%d,%d,%d]",
1243                                  xidLook->tag.lock, xidLook->tag.pid, xidLook->tag.xid);
1244 #endif
1245                 }
1246 #endif
1247
1248                 /* ------------------
1249                  * fix the general lock stats
1250                  * ------------------
1251                  */
1252                 if (lock->nHolding != xidLook->nHolding)
1253                 {
1254                         lock->nHolding -= xidLook->nHolding;
1255                         lock->nActive -= xidLook->nHolding;
1256                         Assert(lock->nActive >= 0);
1257                         for (i = 1; i <= numLockModes; i++)
1258                         {
1259                                 lock->holders[i] -= xidLook->holders[i];
1260                                 lock->activeHolders[i] -= xidLook->holders[i];
1261                                 if (!lock->activeHolders[i])
1262                                         lock->mask &= BITS_OFF[i];
1263                         }
1264                 }
1265                 else
1266                 {
1267                         /* --------------
1268                          * set nHolding to zero so that we can garbage collect the lock
1269                          * down below...
1270                          * --------------
1271                          */
1272                         lock->nHolding = 0;
1273                 }
1274                 /* ----------------
1275                  * always remove the xidLookup entry, we're done with it now
1276                  * ----------------
1277                  */
1278 #ifdef USER_LOCKS
1279                 SHMQueueDelete(&xidLook->queue);
1280 #endif
1281                 if ((!hash_search(lockMethodTable->xidHash, (Pointer) xidLook, HASH_REMOVE, &found))
1282                         || !found)
1283                 {
1284                         SpinRelease(masterLock);
1285                         elog(NOTICE, "LockReleaseAll: xid table corrupted");
1286                         return (FALSE);
1287                 }
1288
1289                 if (!lock->nHolding)
1290                 {
1291                         /* --------------------
1292                          * if there's no one waiting in the queue, we've just released
1293                          * the last lock.
1294                          * --------------------
1295                          */
1296
1297                         Assert(lockMethodTable->lockHash->hash == tag_hash);
1298                         lock = (LOCK *)
1299                                 hash_search(lockMethodTable->lockHash, (Pointer) &(lock->tag), HASH_REMOVE, &found);
1300                         if ((!lock) || (!found))
1301                         {
1302                                 SpinRelease(masterLock);
1303                                 elog(NOTICE, "LockReleaseAll: cannot remove lock from HTAB");
1304                                 return (FALSE);
1305                         }
1306                 }
1307                 else
1308                 {
1309                         /* --------------------
1310                          * Wake the first waiting process and grant him the lock if it
1311                          * doesn't conflict.  The woken process must record the lock
1312                          * him/herself.
1313                          * --------------------
1314                          */
1315                         waitQueue = &(lock->waitProcs);
1316                         ProcLockWakeup(waitQueue, lockmethod, lock);
1317                 }
1318
1319 #ifdef USER_LOCKS
1320 next_item:
1321 #endif
1322                 if (done)
1323                         break;
1324                 SHMQueueFirst(&xidLook->queue, (Pointer *) &tmp, &tmp->queue);
1325                 xidLook = tmp;
1326         }
1327         SpinRelease(masterLock);
1328 #ifdef USER_LOCKS
1329
1330         /*
1331          * Reinitialize the queue only if nothing has been left in.
1332          */
1333         if (nskip == 0)
1334 #endif
1335                 SHMQueueInit(lockQueue);
1336         return TRUE;
1337 }
1338
1339 int
1340 LockShmemSize()
1341 {
1342         int                     size = 0;
1343         int                     nLockBuckets,
1344                                 nLockSegs;
1345         int                     nXidBuckets,
1346                                 nXidSegs;
1347
1348         nLockBuckets = 1 << (int) my_log2((NLOCKENTS - 1) / DEF_FFACTOR + 1);
1349         nLockSegs = 1 << (int) my_log2((nLockBuckets - 1) / DEF_SEGSIZE + 1);
1350
1351         nXidBuckets = 1 << (int) my_log2((NLOCKS_PER_XACT - 1) / DEF_FFACTOR + 1);
1352         nXidSegs = 1 << (int) my_log2((nLockBuckets - 1) / DEF_SEGSIZE + 1);
1353
1354         size += MAXALIGN(NBACKENDS * sizeof(PROC)); /* each MyProc */
1355         size += MAXALIGN(NBACKENDS * sizeof(LOCKMETHODCTL)); /* each lockMethodTable->ctl */
1356         size += MAXALIGN(sizeof(PROC_HDR)); /* ProcGlobal */
1357
1358         size += MAXALIGN(my_log2(NLOCKENTS) * sizeof(void *));
1359         size += MAXALIGN(sizeof(HHDR));
1360         size += nLockSegs * MAXALIGN(DEF_SEGSIZE * sizeof(SEGMENT));
1361         size += NLOCKENTS *                     /* XXX not multiple of BUCKET_ALLOC_INCR? */
1362                 (MAXALIGN(sizeof(BUCKET_INDEX)) +
1363                  MAXALIGN(sizeof(LOCK)));               /* contains hash key */
1364
1365         size += MAXALIGN(my_log2(NBACKENDS) * sizeof(void *));
1366         size += MAXALIGN(sizeof(HHDR));
1367         size += nXidSegs * MAXALIGN(DEF_SEGSIZE * sizeof(SEGMENT));
1368         size += NBACKENDS *                     /* XXX not multiple of BUCKET_ALLOC_INCR? */
1369                 (MAXALIGN(sizeof(BUCKET_INDEX)) +
1370                  MAXALIGN(sizeof(XIDLookupEnt)));               /* contains hash key */
1371
1372         return size;
1373 }
1374
1375 /* -----------------
1376  * Boolean function to determine current locking status
1377  * -----------------
1378  */
1379 bool
1380 LockingDisabled()
1381 {
1382         return LockingIsDisabled;
1383 }
1384
1385 /*
1386  * DeadlockCheck -- Checks for deadlocks for a given process
1387  *
1388  * We can't block on user locks, so no sense testing for deadlock
1389  * because there is no blocking, and no timer for the block.
1390  *
1391  * This code takes a list of locks a process holds, and the lock that
1392  * the process is sleeping on, and tries to find if any of the processes
1393  * waiting on its locks hold the lock it is waiting for.  If no deadlock
1394  * is found, it goes on to look at all the processes waiting on their locks.
1395  *
1396  * We have already locked the master lock before being called.
1397  */
1398 bool
1399 DeadLockCheck(SHM_QUEUE *lockQueue, LOCK *findlock, bool skip_check)
1400 {
1401         int                     done;
1402         XIDLookupEnt *xidLook = NULL;
1403         XIDLookupEnt *tmp = NULL;
1404         SHMEM_OFFSET end = MAKE_OFFSET(lockQueue);
1405         LOCK       *lock;
1406
1407         LOCKMETHODTABLE    *lockMethodTable;
1408         XIDLookupEnt *result,
1409                                 item;
1410         HTAB       *xidTable;
1411         bool            found;
1412
1413         static PROC *checked_procs[MaxBackendId];
1414         static int      nprocs;
1415         static bool MyNHolding;
1416
1417         /* initialize at start of recursion */
1418         if (skip_check)
1419         {
1420                 checked_procs[0] = MyProc;
1421                 nprocs = 1;
1422
1423                 lockMethodTable = LockMethodTable[1];
1424                 xidTable = lockMethodTable->xidHash;
1425
1426                 MemSet(&item, 0, XID_TAGSIZE);
1427                 TransactionIdStore(MyProc->xid, &item.tag.xid);
1428                 item.tag.lock = MAKE_OFFSET(findlock);
1429 #if 0
1430                 item.tag.pid = pid;
1431 #endif
1432
1433                 if (!(result = (XIDLookupEnt *)
1434                           hash_search(xidTable, (Pointer) &item, HASH_FIND, &found)) || !found)
1435                 {
1436                         elog(NOTICE, "LockAcquire: xid table corrupted");
1437                         return true;
1438                 }
1439                 MyNHolding = result->nHolding;
1440         }
1441         if (SHMQueueEmpty(lockQueue))
1442                 return false;
1443
1444         SHMQueueFirst(lockQueue, (Pointer *) &xidLook, &xidLook->queue);
1445
1446         XID_PRINT("DeadLockCheck", xidLook);
1447
1448         for (;;)
1449         {
1450                 /* ---------------------------
1451                  * XXX Here we assume the shared memory queue is circular and
1452                  * that we know its internal structure.  Should have some sort of
1453                  * macros to allow one to walk it.      mer 20 July 1991
1454                  * ---------------------------
1455                  */
1456                 done = (xidLook->queue.next == end);
1457                 lock = (LOCK *) MAKE_PTR(xidLook->tag.lock);
1458
1459                 LOCK_PRINT("DeadLockCheck", (&lock->tag), 0);
1460
1461                 /*
1462                  * This is our only check to see if we found the lock we want.
1463                  *
1464                  * The lock we are waiting for is already in MyProc->lockQueue so we
1465                  * need to skip it here.  We are trying to find it in someone
1466                  * else's lockQueue.
1467                  */
1468                 if (lock == findlock && !skip_check)
1469                         return true;
1470
1471                 {
1472                         PROC_QUEUE *waitQueue = &(lock->waitProcs);
1473                         PROC       *proc;
1474                         int                     i;
1475                         int                     j;
1476
1477                         proc = (PROC *) MAKE_PTR(waitQueue->links.prev);
1478                         for (i = 0; i < waitQueue->size; i++)
1479                         {
1480                                 if (proc != MyProc &&
1481                                         lock == findlock && /* skip_check also true */
1482                                         MyNHolding) /* I already hold some lock on it */
1483                                 {
1484
1485                                         /*
1486                                          * For findlock's wait queue, we are interested in
1487                                          * procs who are blocked waiting for a write-lock on
1488                                          * the table we are waiting on, and already hold a
1489                                          * lock on it. We first check to see if there is an
1490                                          * escalation deadlock, where we hold a readlock and
1491                                          * want a writelock, and someone else holds readlock
1492                                          * on the same table, and wants a writelock.
1493                                          *
1494                                          * Basically, the test is, "Do we both hold some lock on
1495                                          * findlock, and we are both waiting in the lock
1496                                          * queue?"
1497                                          */
1498
1499                                         Assert(skip_check);
1500                                         Assert(MyProc->prio == 2);
1501
1502                                         lockMethodTable = LockMethodTable[1];
1503                                         xidTable = lockMethodTable->xidHash;
1504
1505                                         MemSet(&item, 0, XID_TAGSIZE);
1506                                         TransactionIdStore(proc->xid, &item.tag.xid);
1507                                         item.tag.lock = MAKE_OFFSET(findlock);
1508 #if 0
1509                                         item.tag.pid = pid;
1510 #endif
1511
1512                                         if (!(result = (XIDLookupEnt *)
1513                                                   hash_search(xidTable, (Pointer) &item, HASH_FIND, &found)) || !found)
1514                                         {
1515                                                 elog(NOTICE, "LockAcquire: xid table corrupted");
1516                                                 return true;
1517                                         }
1518                                         if (result->nHolding)
1519                                                 return true;
1520                                 }
1521
1522                                 /*
1523                                  * No sense in looking at the wait queue of the lock we
1524                                  * are looking for. If lock == findlock, and I got here,
1525                                  * skip_check must be true too.
1526                                  */
1527                                 if (lock != findlock)
1528                                 {
1529                                         for (j = 0; j < nprocs; j++)
1530                                                 if (checked_procs[j] == proc)
1531                                                         break;
1532                                         if (j >= nprocs && lock != findlock)
1533                                         {
1534                                                 checked_procs[nprocs++] = proc;
1535                                                 Assert(nprocs <= MaxBackendId);
1536
1537                                                 /*
1538                                                  * For non-MyProc entries, we are looking only
1539                                                  * waiters, not necessarily people who already
1540                                                  * hold locks and are waiting. Now we check for
1541                                                  * cases where we have two or more tables in a
1542                                                  * deadlock.  We do this by continuing to search
1543                                                  * for someone holding a lock
1544                                                  */
1545                                                 if (DeadLockCheck(&(proc->lockQueue), findlock, false))
1546                                                         return true;
1547                                         }
1548                                 }
1549                                 proc = (PROC *) MAKE_PTR(proc->links.prev);
1550                         }
1551                 }
1552
1553                 if (done)
1554                         break;
1555                 SHMQueueFirst(&xidLook->queue, (Pointer *) &tmp, &tmp->queue);
1556                 xidLook = tmp;
1557         }
1558
1559         /* if we got here, no deadlock */
1560         return false;
1561 }
1562
1563 #ifdef DEADLOCK_DEBUG
1564 /*
1565  * Dump all locks. Must have already acquired the masterLock.
1566  */
1567 void
1568 DumpLocks()
1569 {
1570         SHMEM_OFFSET location;
1571         PROC       *proc;
1572         SHM_QUEUE  *lockQueue;
1573         int                     done;
1574         XIDLookupEnt *xidLook = NULL;
1575         XIDLookupEnt *tmp = NULL;
1576         SHMEM_OFFSET end;
1577         SPINLOCK        masterLock;
1578         int                     numLockModes;
1579         LOCK       *lock;
1580
1581         count;
1582         int                     lockmethod = 1;
1583         LOCKMETHODTABLE    *lockMethodTable;
1584
1585         ShmemPIDLookup(MyProcPid, &location);
1586         if (location == INVALID_OFFSET)
1587                 return;
1588         proc = (PROC *) MAKE_PTR(location);
1589         if (proc != MyProc)
1590                 return;
1591         lockQueue = &proc->lockQueue;
1592
1593         Assert(lockmethod < NumLockMethods);
1594         lockMethodTable = LockMethodTable[lockmethod];
1595         if (!lockMethodTable)
1596                 return;
1597
1598         numLockModes = lockMethodTable->ctl->numLockModes;
1599         masterLock = lockMethodTable->ctl->masterLock;
1600
1601         if (SHMQueueEmpty(lockQueue))
1602                 return;
1603
1604         SHMQueueFirst(lockQueue, (Pointer *) &xidLook, &xidLook->queue);
1605         end = MAKE_OFFSET(lockQueue);
1606
1607         LOCK_DUMP("DumpLocks", MyProc->waitLock, 0);
1608         XID_PRINT("DumpLocks", xidLook);
1609
1610         for (count = 0;;)
1611         {
1612                 /* ---------------------------
1613                  * XXX Here we assume the shared memory queue is circular and
1614                  * that we know its internal structure.  Should have some sort of
1615                  * macros to allow one to walk it.      mer 20 July 1991
1616                  * ---------------------------
1617                  */
1618                 done = (xidLook->queue.next == end);
1619                 lock = (LOCK *) MAKE_PTR(xidLook->tag.lock);
1620
1621                 LOCK_DUMP("DumpLocks", lock, 0);
1622
1623                 if (count++ > 2000)
1624                 {
1625                         elog(NOTICE, "DumpLocks: xid loop detected, giving up");
1626                         break;
1627                 }
1628
1629                 if (done)
1630                         break;
1631                 SHMQueueFirst(&xidLook->queue, (Pointer *) &tmp, &tmp->queue);
1632                 xidLook = tmp;
1633         }
1634 }
1635
1636 #endif