]> granicus.if.org Git - postgresql/blob - src/backend/storage/lmgr/lock.c
7816a6c96894f11138c780863cc01780cc8b12ba
[postgresql] / src / backend / storage / lmgr / lock.c
1 /*-------------------------------------------------------------------------
2  *
3  * lock.c
4  *        POSTGRES low-level lock mechanism
5  *
6  * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.92 2001/08/23 23:06:38 tgl Exp $
12  *
13  * NOTES
14  *        Outside modules can create a lock table and acquire/release
15  *        locks.  A lock table is a shared memory hash table.  When
16  *        a process tries to acquire a lock of a type that conflicts
17  *        with existing locks, it is put to sleep using the routines
18  *        in storage/lmgr/proc.c.
19  *
20  *        For the most part, this code should be invoked via lmgr.c
21  *        or another lock-management module, not directly.
22  *
23  *      Interface:
24  *
25  *      LockAcquire(), LockRelease(), LockMethodTableInit(),
26  *      LockMethodTableRename(), LockReleaseAll,
27  *      LockCheckConflicts(), GrantLock()
28  *
29  *-------------------------------------------------------------------------
30  */
31 #include "postgres.h"
32
33 #include <sys/types.h>
34 #include <unistd.h>
35 #include <signal.h>
36
37 #include "access/xact.h"
38 #include "miscadmin.h"
39 #include "storage/proc.h"
40 #include "utils/memutils.h"
41 #include "utils/ps_status.h"
42
43
44 /* This configuration variable is used to set the lock table size */
45 int             max_locks_per_xact;             /* set by guc.c */
46
47 #define NLOCKENTS(maxBackends)  (max_locks_per_xact * (maxBackends))
48
49
50 static int WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode,
51                    LOCK *lock, HOLDER *holder);
52 static void LockCountMyLocks(SHMEM_OFFSET lockOffset, PROC *proc,
53                                  int *myHolding);
54
55 static char *lock_mode_names[] =
56 {
57         "INVALID",
58         "AccessShareLock",
59         "RowShareLock",
60         "RowExclusiveLock",
61         "ShareUpdateExclusiveLock",
62         "ShareLock",
63         "ShareRowExclusiveLock",
64         "ExclusiveLock",
65         "AccessExclusiveLock"
66 };
67
68 static char *DeadLockMessage = "Deadlock detected.\n\tSee the lock(l) manual page for a possible cause.";
69
70
71 #ifdef LOCK_DEBUG
72
73 /*------
74  * The following configuration options are available for lock debugging:
75  *
76  *         TRACE_LOCKS          -- give a bunch of output what's going on in this file
77  *         TRACE_USERLOCKS      -- same but for user locks
78  *         TRACE_LOCK_OIDMIN-- do not trace locks for tables below this oid
79  *                                                 (use to avoid output on system tables)
80  *         TRACE_LOCK_TABLE -- trace locks on this table (oid) unconditionally
81  *         DEBUG_DEADLOCKS      -- currently dumps locks at untimely occasions ;)
82  *
83  * Furthermore, but in storage/ipc/spin.c:
84  *         TRACE_SPINLOCKS      -- trace spinlocks (pretty useless)
85  *
86  * Define LOCK_DEBUG at compile time to get all these enabled.
87  * --------
88  */
89
90 int                     Trace_lock_oidmin = BootstrapObjectIdData;
91 bool            Trace_locks = false;
92 bool            Trace_userlocks = false;
93 int                     Trace_lock_table = 0;
94 bool            Debug_deadlocks = false;
95
96
97 inline static bool
98 LOCK_DEBUG_ENABLED(const LOCK *lock)
99 {
100         return
101         (((LOCK_LOCKMETHOD(*lock) == DEFAULT_LOCKMETHOD && Trace_locks)
102           || (LOCK_LOCKMETHOD(*lock) == USER_LOCKMETHOD && Trace_userlocks))
103          && (lock->tag.relId >= (Oid) Trace_lock_oidmin))
104         || (Trace_lock_table && (lock->tag.relId == Trace_lock_table));
105 }
106
107
108 inline static void
109 LOCK_PRINT(const char *where, const LOCK *lock, LOCKMODE type)
110 {
111         if (LOCK_DEBUG_ENABLED(lock))
112                 elog(DEBUG,
113                          "%s: lock(%lx) tbl(%d) rel(%u) db(%u) obj(%u) grantMask(%x) "
114                          "req(%d,%d,%d,%d,%d,%d,%d)=%d "
115                          "grant(%d,%d,%d,%d,%d,%d,%d)=%d wait(%d) type(%s)",
116                          where, MAKE_OFFSET(lock),
117                          lock->tag.lockmethod, lock->tag.relId, lock->tag.dbId,
118                          lock->tag.objId.blkno, lock->grantMask,
119                          lock->requested[1], lock->requested[2], lock->requested[3],
120                          lock->requested[4], lock->requested[5], lock->requested[6],
121                          lock->requested[7], lock->nRequested,
122                          lock->granted[1], lock->granted[2], lock->granted[3],
123                          lock->granted[4], lock->granted[5], lock->granted[6],
124                          lock->granted[7], lock->nGranted,
125                          lock->waitProcs.size, lock_mode_names[type]);
126 }
127
128
129 inline static void
130 HOLDER_PRINT(const char *where, const HOLDER *holderP)
131 {
132         if (
133          (((HOLDER_LOCKMETHOD(*holderP) == DEFAULT_LOCKMETHOD && Trace_locks)
134            || (HOLDER_LOCKMETHOD(*holderP) == USER_LOCKMETHOD && Trace_userlocks))
135           && (((LOCK *) MAKE_PTR(holderP->tag.lock))->tag.relId >= (Oid) Trace_lock_oidmin))
136                 || (Trace_lock_table && (((LOCK *) MAKE_PTR(holderP->tag.lock))->tag.relId == Trace_lock_table))
137         )
138                 elog(DEBUG,
139                          "%s: holder(%lx) lock(%lx) tbl(%d) proc(%lx) xid(%u) hold(%d,%d,%d,%d,%d,%d,%d)=%d",
140                          where, MAKE_OFFSET(holderP), holderP->tag.lock,
141                          HOLDER_LOCKMETHOD(*(holderP)),
142                          holderP->tag.proc, holderP->tag.xid,
143                    holderP->holding[1], holderP->holding[2], holderP->holding[3],
144                    holderP->holding[4], holderP->holding[5], holderP->holding[6],
145                          holderP->holding[7], holderP->nHolding);
146 }
147
148 #else                                                   /* not LOCK_DEBUG */
149
150 #define LOCK_PRINT(where, lock, type)
151 #define HOLDER_PRINT(where, holderP)
152
153 #endif   /* not LOCK_DEBUG */
154
155
156
157 SPINLOCK        LockMgrLock;            /* in Shmem or created in
158                                                                  * CreateSpinlocks() */
159
160 /*
161  * These are to simplify/speed up some bit arithmetic.
162  *
163  * XXX is a fetch from a static array really faster than a shift?
164  * Wouldn't bet on it...
165  */
166
167 static LOCKMASK BITS_OFF[MAX_LOCKMODES];
168 static LOCKMASK BITS_ON[MAX_LOCKMODES];
169
170 /*
171  * Disable flag
172  */
173 static bool LockingIsDisabled;
174
175 /*
176  * map from lockmethod to the lock table structure
177  */
178 static LOCKMETHODTABLE *LockMethodTable[MAX_LOCK_METHODS];
179
180 static int      NumLockMethods;
181
182 /*
183  * InitLocks -- Init the lock module.  Create a private data
184  *              structure for constructing conflict masks.
185  */
186 void
187 InitLocks(void)
188 {
189         int                     i;
190         int                     bit;
191
192         bit = 1;
193         for (i = 0; i < MAX_LOCKMODES; i++, bit <<= 1)
194         {
195                 BITS_ON[i] = bit;
196                 BITS_OFF[i] = ~bit;
197         }
198 }
199
200 /*
201  * LockDisable -- sets LockingIsDisabled flag to TRUE or FALSE.
202  */
203 void
204 LockDisable(bool status)
205 {
206         LockingIsDisabled = status;
207 }
208
209 /*
210  * Boolean function to determine current locking status
211  */
212 bool
213 LockingDisabled(void)
214 {
215         return LockingIsDisabled;
216 }
217
218 /*
219  * Fetch the lock method table associated with a given lock
220  */
221 LOCKMETHODTABLE *
222 GetLocksMethodTable(LOCK *lock)
223 {
224         LOCKMETHOD      lockmethod = LOCK_LOCKMETHOD(*lock);
225
226         Assert(lockmethod > 0 && lockmethod < NumLockMethods);
227         return LockMethodTable[lockmethod];
228 }
229
230
231 /*
232  * LockMethodInit -- initialize the lock table's lock type
233  *              structures
234  *
235  * Notes: just copying.  Should only be called once.
236  */
237 static void
238 LockMethodInit(LOCKMETHODTABLE *lockMethodTable,
239                            LOCKMASK *conflictsP,
240                            int *prioP,
241                            int numModes)
242 {
243         int                     i;
244
245         lockMethodTable->ctl->numLockModes = numModes;
246         numModes++;
247         for (i = 0; i < numModes; i++, prioP++, conflictsP++)
248         {
249                 lockMethodTable->ctl->conflictTab[i] = *conflictsP;
250                 lockMethodTable->ctl->prio[i] = *prioP;
251         }
252 }
253
254 /*
255  * LockMethodTableInit -- initialize a lock table structure
256  *
257  * Notes:
258  *              (a) a lock table has four separate entries in the shmem index
259  *              table.  This is because every shared hash table and spinlock
260  *              has its name stored in the shmem index at its creation.  It
261  *              is wasteful, in this case, but not much space is involved.
262  *
263  * NOTE: data structures allocated here are allocated permanently, using
264  * TopMemoryContext and shared memory.  We don't ever release them anyway,
265  * and in normal multi-backend operation the lock table structures set up
266  * by the postmaster are inherited by each backend, so they must be in
267  * TopMemoryContext.
268  */
269 LOCKMETHOD
270 LockMethodTableInit(char *tabName,
271                                         LOCKMASK *conflictsP,
272                                         int *prioP,
273                                         int numModes,
274                                         int maxBackends)
275 {
276         LOCKMETHODTABLE *lockMethodTable;
277         char       *shmemName;
278         HASHCTL         info;
279         int                     hash_flags;
280         bool            found;
281         long            init_table_size,
282                                 max_table_size;
283
284         if (numModes >= MAX_LOCKMODES)
285         {
286                 elog(NOTICE, "LockMethodTableInit: too many lock types %d greater than %d",
287                          numModes, MAX_LOCKMODES);
288                 return INVALID_LOCKMETHOD;
289         }
290
291         /* Compute init/max size to request for lock hashtables */
292         max_table_size = NLOCKENTS(maxBackends);
293         init_table_size = max_table_size / 10;
294
295         /* Allocate a string for the shmem index table lookups. */
296         /* This is just temp space in this routine, so palloc is OK. */
297         shmemName = (char *) palloc(strlen(tabName) + 32);
298
299         /* each lock table has a non-shared, permanent header */
300         lockMethodTable = (LOCKMETHODTABLE *)
301                 MemoryContextAlloc(TopMemoryContext, sizeof(LOCKMETHODTABLE));
302
303         /*
304          * find/acquire the spinlock for the table
305          */
306         SpinAcquire(LockMgrLock);
307
308         /*
309          * allocate a control structure from shared memory or attach to it if
310          * it already exists.
311          */
312         sprintf(shmemName, "%s (ctl)", tabName);
313         lockMethodTable->ctl = (LOCKMETHODCTL *)
314                 ShmemInitStruct(shmemName, sizeof(LOCKMETHODCTL), &found);
315
316         if (!lockMethodTable->ctl)
317                 elog(FATAL, "LockMethodTableInit: couldn't initialize %s", tabName);
318
319         /*
320          * no zero-th table
321          */
322         NumLockMethods = 1;
323
324         /*
325          * we're first - initialize
326          */
327         if (!found)
328         {
329                 MemSet(lockMethodTable->ctl, 0, sizeof(LOCKMETHODCTL));
330                 lockMethodTable->ctl->masterLock = LockMgrLock;
331                 lockMethodTable->ctl->lockmethod = NumLockMethods;
332         }
333
334         /*
335          * other modules refer to the lock table by a lockmethod ID
336          */
337         LockMethodTable[NumLockMethods] = lockMethodTable;
338         NumLockMethods++;
339         Assert(NumLockMethods <= MAX_LOCK_METHODS);
340
341         /*
342          * allocate a hash table for LOCK structs.      This is used to store
343          * per-locked-object information.
344          */
345         info.keysize = SHMEM_LOCKTAB_KEYSIZE;
346         info.datasize = SHMEM_LOCKTAB_DATASIZE;
347         info.hash = tag_hash;
348         hash_flags = (HASH_ELEM | HASH_FUNCTION);
349
350         sprintf(shmemName, "%s (lock hash)", tabName);
351         lockMethodTable->lockHash = ShmemInitHash(shmemName,
352                                                                                           init_table_size,
353                                                                                           max_table_size,
354                                                                                           &info,
355                                                                                           hash_flags);
356
357         if (!lockMethodTable->lockHash)
358                 elog(FATAL, "LockMethodTableInit: couldn't initialize %s", tabName);
359         Assert(lockMethodTable->lockHash->hash == tag_hash);
360
361         /*
362          * allocate a hash table for HOLDER structs.  This is used to store
363          * per-lock-holder information.
364          */
365         info.keysize = SHMEM_HOLDERTAB_KEYSIZE;
366         info.datasize = SHMEM_HOLDERTAB_DATASIZE;
367         info.hash = tag_hash;
368         hash_flags = (HASH_ELEM | HASH_FUNCTION);
369
370         sprintf(shmemName, "%s (holder hash)", tabName);
371         lockMethodTable->holderHash = ShmemInitHash(shmemName,
372                                                                                                 init_table_size,
373                                                                                                 max_table_size,
374                                                                                                 &info,
375                                                                                                 hash_flags);
376
377         if (!lockMethodTable->holderHash)
378                 elog(FATAL, "LockMethodTableInit: couldn't initialize %s", tabName);
379
380         /* init ctl data structures */
381         LockMethodInit(lockMethodTable, conflictsP, prioP, numModes);
382
383         SpinRelease(LockMgrLock);
384
385         pfree(shmemName);
386
387         return lockMethodTable->ctl->lockmethod;
388 }
389
390 /*
391  * LockMethodTableRename -- allocate another lockmethod ID to the same
392  *              lock table.
393  *
394  * NOTES: Both the lock module and the lock chain (lchain.c)
395  *              module use table id's to distinguish between different
396  *              kinds of locks.  Short term and long term locks look
397  *              the same to the lock table, but are handled differently
398  *              by the lock chain manager.      This function allows the
399  *              client to use different lockmethods when acquiring/releasing
400  *              short term and long term locks, yet store them all in one hashtable.
401  */
402
403 LOCKMETHOD
404 LockMethodTableRename(LOCKMETHOD lockmethod)
405 {
406         LOCKMETHOD      newLockMethod;
407
408         if (NumLockMethods >= MAX_LOCK_METHODS)
409                 return INVALID_LOCKMETHOD;
410         if (LockMethodTable[lockmethod] == INVALID_LOCKMETHOD)
411                 return INVALID_LOCKMETHOD;
412
413         /* other modules refer to the lock table by a lockmethod ID */
414         newLockMethod = NumLockMethods;
415         NumLockMethods++;
416
417         LockMethodTable[newLockMethod] = LockMethodTable[lockmethod];
418         return newLockMethod;
419 }
420
421 /*
422  * LockAcquire -- Check for lock conflicts, sleep if conflict found,
423  *              set lock if/when no conflicts.
424  *
425  * Returns: TRUE if lock was acquired, FALSE otherwise.  Note that
426  *              a FALSE return is to be expected if dontWait is TRUE;
427  *              but if dontWait is FALSE, only a parameter error can cause
428  *              a FALSE return.  (XXX probably we should just elog on parameter
429  *              errors, instead of conflating this with failure to acquire lock?)
430  *
431  * Side Effects: The lock is acquired and recorded in lock tables.
432  *
433  * NOTE: if we wait for the lock, there is no way to abort the wait
434  * short of aborting the transaction.
435  *
436  *
437  * Note on User Locks:
438  *
439  *              User locks are handled totally on the application side as
440  *              long term cooperative locks which extend beyond the normal
441  *              transaction boundaries.  Their purpose is to indicate to an
442  *              application that someone is `working' on an item.  So it is
443  *              possible to put an user lock on a tuple's oid, retrieve the
444  *              tuple, work on it for an hour and then update it and remove
445  *              the lock.  While the lock is active other clients can still
446  *              read and write the tuple but they can be aware that it has
447  *              been locked at the application level by someone.
448  *              User locks use lock tags made of an uint16 and an uint32, for
449  *              example 0 and a tuple oid, or any other arbitrary pair of
450  *              numbers following a convention established by the application.
451  *              In this sense tags don't refer to tuples or database entities.
452  *              User locks and normal locks are completely orthogonal and
453  *              they don't interfere with each other, so it is possible
454  *              to acquire a normal lock on an user-locked tuple or user-lock
455  *              a tuple for which a normal write lock already exists.
456  *              User locks are always non blocking, therefore they are never
457  *              acquired if already held by another process.  They must be
458  *              released explicitly by the application but they are released
459  *              automatically when a backend terminates.
460  *              They are indicated by a lockmethod 2 which is an alias for the
461  *              normal lock table, and are distinguished from normal locks
462  *              by the following differences:
463  *
464  *                                                                              normal lock             user lock
465  *
466  *              lockmethod                                              1                               2
467  *              tag.dbId                                                database oid    database oid
468  *              tag.relId                                               rel oid or 0    0
469  *              tag.objId                                               block id                lock id2
470  *                                                                              or xact id
471  *              tag.offnum                                              0                               lock id1
472  *              holder.xid                                              xid or 0                0
473  *              persistence                                             transaction             user or backend
474  *                                                                              or backend
475  *
476  *              The lockmode parameter can have the same values for normal locks
477  *              although probably only WRITE_LOCK can have some practical use.
478  *
479  *                                                                                                              DZ - 22 Nov 1997
480  */
481
482 bool
483 LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
484                         TransactionId xid, LOCKMODE lockmode, bool dontWait)
485 {
486         HOLDER     *holder;
487         HOLDERTAG       holdertag;
488         HTAB       *holderTable;
489         bool            found;
490         LOCK       *lock;
491         SPINLOCK        masterLock;
492         LOCKMETHODTABLE *lockMethodTable;
493         int                     status;
494         int                     myHolding[MAX_LOCKMODES];
495         int                     i;
496
497 #ifdef LOCK_DEBUG
498         if (lockmethod == USER_LOCKMETHOD && Trace_userlocks)
499                 elog(DEBUG, "LockAcquire: user lock [%u] %s",
500                          locktag->objId.blkno, lock_mode_names[lockmode]);
501 #endif
502
503         /* ???????? This must be changed when short term locks will be used */
504         locktag->lockmethod = lockmethod;
505
506         Assert(lockmethod < NumLockMethods);
507         lockMethodTable = LockMethodTable[lockmethod];
508         if (!lockMethodTable)
509         {
510                 elog(NOTICE, "LockAcquire: bad lock table %d", lockmethod);
511                 return FALSE;
512         }
513
514         if (LockingIsDisabled)
515                 return TRUE;
516
517         masterLock = lockMethodTable->ctl->masterLock;
518
519         SpinAcquire(masterLock);
520
521         /*
522          * Find or create a lock with this tag
523          */
524         Assert(lockMethodTable->lockHash->hash == tag_hash);
525         lock = (LOCK *) hash_search(lockMethodTable->lockHash, (Pointer) locktag,
526                                                                 HASH_ENTER, &found);
527         if (!lock)
528         {
529                 SpinRelease(masterLock);
530                 elog(FATAL, "LockAcquire: lock table %d is corrupted", lockmethod);
531                 return FALSE;
532         }
533
534         /*
535          * if it's a new lock object, initialize it
536          */
537         if (!found)
538         {
539                 lock->grantMask = 0;
540                 lock->waitMask = 0;
541                 SHMQueueInit(&(lock->lockHolders));
542                 ProcQueueInit(&(lock->waitProcs));
543                 lock->nRequested = 0;
544                 lock->nGranted = 0;
545                 MemSet((char *) lock->requested, 0, sizeof(int) * MAX_LOCKMODES);
546                 MemSet((char *) lock->granted, 0, sizeof(int) * MAX_LOCKMODES);
547                 LOCK_PRINT("LockAcquire: new", lock, lockmode);
548         }
549         else
550         {
551                 LOCK_PRINT("LockAcquire: found", lock, lockmode);
552                 Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
553                 Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
554                 Assert(lock->nGranted <= lock->nRequested);
555         }
556
557         /*
558          * Create the hash key for the holder table.
559          */
560         MemSet(&holdertag, 0, sizeof(HOLDERTAG));       /* must clear padding,
561                                                                                                  * needed */
562         holdertag.lock = MAKE_OFFSET(lock);
563         holdertag.proc = MAKE_OFFSET(MyProc);
564         TransactionIdStore(xid, &holdertag.xid);
565
566         /*
567          * Find or create a holder entry with this tag
568          */
569         holderTable = lockMethodTable->holderHash;
570         holder = (HOLDER *) hash_search(holderTable, (Pointer) &holdertag,
571                                                                         HASH_ENTER, &found);
572         if (!holder)
573         {
574                 SpinRelease(masterLock);
575                 elog(FATAL, "LockAcquire: holder table corrupted");
576                 return FALSE;
577         }
578
579         /*
580          * If new, initialize the new entry
581          */
582         if (!found)
583         {
584                 holder->nHolding = 0;
585                 MemSet((char *) holder->holding, 0, sizeof(int) * MAX_LOCKMODES);
586                 /* Add holder to appropriate lists */
587                 SHMQueueInsertBefore(&lock->lockHolders, &holder->lockLink);
588                 SHMQueueInsertBefore(&MyProc->procHolders, &holder->procLink);
589                 HOLDER_PRINT("LockAcquire: new", holder);
590         }
591         else
592         {
593                 HOLDER_PRINT("LockAcquire: found", holder);
594                 Assert((holder->nHolding >= 0) && (holder->holding[lockmode] >= 0));
595                 Assert(holder->nHolding <= lock->nGranted);
596
597 #ifdef CHECK_DEADLOCK_RISK
598
599                 /*
600                  * Issue warning if we already hold a lower-level lock on this
601                  * object and do not hold a lock of the requested level or higher.
602                  * This indicates a deadlock-prone coding practice (eg, we'd have
603                  * a deadlock if another backend were following the same code path
604                  * at about the same time).
605                  *
606                  * This is not enabled by default, because it may generate log
607                  * entries about user-level coding practices that are in fact safe
608                  * in context. It can be enabled to help find system-level
609                  * problems.
610                  *
611                  * XXX Doing numeric comparison on the lockmodes is a hack; it'd be
612                  * better to use a table.  For now, though, this works.
613                  */
614                 for (i = lockMethodTable->ctl->numLockModes; i > 0; i--)
615                 {
616                         if (holder->holding[i] > 0)
617                         {
618                                 if (i >= (int) lockmode)
619                                         break;          /* safe: we have a lock >= req level */
620                                 elog(DEBUG, "Deadlock risk: raising lock level"
621                                          " from %s to %s on object %u/%u/%u",
622                                          lock_mode_names[i], lock_mode_names[lockmode],
623                                  lock->tag.relId, lock->tag.dbId, lock->tag.objId.blkno);
624                                 break;
625                         }
626                 }
627 #endif   /* CHECK_DEADLOCK_RISK */
628         }
629
630         /*
631          * lock->nRequested and lock->requested[] count the total number of
632          * requests, whether granted or waiting, so increment those
633          * immediately. The other counts don't increment till we get the lock.
634          */
635         lock->nRequested++;
636         lock->requested[lockmode]++;
637         Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
638
639         /*
640          * If I already hold one or more locks of the requested type, just
641          * grant myself another one without blocking.
642          */
643         if (holder->holding[lockmode] > 0)
644         {
645                 GrantLock(lock, holder, lockmode);
646                 HOLDER_PRINT("LockAcquire: owning", holder);
647                 SpinRelease(masterLock);
648                 return TRUE;
649         }
650
651         /*
652          * If this process (under any XID) is a holder of the lock, also grant
653          * myself another one without blocking.
654          */
655         LockCountMyLocks(holder->tag.lock, MyProc, myHolding);
656         if (myHolding[lockmode] > 0)
657         {
658                 GrantLock(lock, holder, lockmode);
659                 HOLDER_PRINT("LockAcquire: my other XID owning", holder);
660                 SpinRelease(masterLock);
661                 return TRUE;
662         }
663
664         /*
665          * If lock requested conflicts with locks requested by waiters, must
666          * join wait queue.  Otherwise, check for conflict with already-held
667          * locks.  (That's last because most complex check.)
668          */
669         if (lockMethodTable->ctl->conflictTab[lockmode] & lock->waitMask)
670                 status = STATUS_FOUND;
671         else
672                 status = LockCheckConflicts(lockMethodTable, lockmode,
673                                                                         lock, holder,
674                                                                         MyProc, myHolding);
675
676         if (status == STATUS_OK)
677         {
678                 /* No conflict with held or previously requested locks */
679                 GrantLock(lock, holder, lockmode);
680         }
681         else
682         {
683                 Assert(status == STATUS_FOUND);
684                 /*
685                  * We can't acquire the lock immediately.  If caller specified no
686                  * blocking, remove the holder entry and return FALSE without waiting.
687                  */
688                 if (dontWait)
689                 {
690                         if (holder->nHolding == 0)
691                         {
692                                 SHMQueueDelete(&holder->lockLink);
693                                 SHMQueueDelete(&holder->procLink);
694                                 holder = (HOLDER *) hash_search(holderTable,
695                                                                                                 (Pointer) holder,
696                                                                                                 HASH_REMOVE, &found);
697                                 if (!holder || !found)
698                                         elog(NOTICE, "LockAcquire: remove holder, table corrupted");
699                         }
700                         else
701                                 HOLDER_PRINT("LockAcquire: NHOLDING", holder);
702                         lock->nRequested--;
703                         lock->requested[lockmode]--;
704                         LOCK_PRINT("LockAcquire: conditional lock failed", lock, lockmode);
705                         Assert((lock->nRequested > 0) && (lock->requested[lockmode] >= 0));
706                         Assert(lock->nGranted <= lock->nRequested);
707                         SpinRelease(masterLock);
708                         return FALSE;
709                 }
710
711                 /*
712                  * Construct bitmask of locks this process holds on this object.
713                  */
714                 {
715                         int                     heldLocks = 0;
716                         int                     tmpMask;
717
718                         for (i = 1, tmpMask = 2;
719                                  i <= lockMethodTable->ctl->numLockModes;
720                                  i++, tmpMask <<= 1)
721                         {
722                                 if (myHolding[i] > 0)
723                                         heldLocks |= tmpMask;
724                         }
725                         MyProc->heldLocks = heldLocks;
726                 }
727
728                 /*
729                  * Sleep till someone wakes me up.
730                  */
731                 status = WaitOnLock(lockmethod, lockmode, lock, holder);
732
733                 /*
734                  * NOTE: do not do any material change of state between here and
735                  * return.      All required changes in locktable state must have been
736                  * done when the lock was granted to us --- see notes in
737                  * WaitOnLock.
738                  */
739
740                 /*
741                  * Check the holder entry status, in case something in the ipc
742                  * communication doesn't work correctly.
743                  */
744                 if (!((holder->nHolding > 0) && (holder->holding[lockmode] > 0)))
745                 {
746                         HOLDER_PRINT("LockAcquire: INCONSISTENT", holder);
747                         LOCK_PRINT("LockAcquire: INCONSISTENT", lock, lockmode);
748                         /* Should we retry ? */
749                         SpinRelease(masterLock);
750                         return FALSE;
751                 }
752                 HOLDER_PRINT("LockAcquire: granted", holder);
753                 LOCK_PRINT("LockAcquire: granted", lock, lockmode);
754         }
755
756         SpinRelease(masterLock);
757
758         return status == STATUS_OK;
759 }
760
761 /*
762  * LockCheckConflicts -- test whether requested lock conflicts
763  *              with those already granted
764  *
765  * Returns STATUS_FOUND if conflict, STATUS_OK if no conflict.
766  *
767  * NOTES:
768  *              Here's what makes this complicated: one process's locks don't
769  * conflict with one another, even if they are held under different
770  * transaction IDs (eg, session and xact locks do not conflict).
771  * So, we must subtract off our own locks when determining whether the
772  * requested new lock conflicts with those already held.
773  *
774  * The caller can optionally pass the process's total holding counts, if
775  * known.  If NULL is passed then these values will be computed internally.
776  */
777 int
778 LockCheckConflicts(LOCKMETHODTABLE *lockMethodTable,
779                                    LOCKMODE lockmode,
780                                    LOCK *lock,
781                                    HOLDER *holder,
782                                    PROC *proc,
783                                    int *myHolding)              /* myHolding[] array or NULL */
784 {
785         LOCKMETHODCTL *lockctl = lockMethodTable->ctl;
786         int                     numLockModes = lockctl->numLockModes;
787         int                     bitmask;
788         int                     i,
789                                 tmpMask;
790         int                     localHolding[MAX_LOCKMODES];
791
792         /*
793          * first check for global conflicts: If no locks conflict with my
794          * request, then I get the lock.
795          *
796          * Checking for conflict: lock->grantMask represents the types of
797          * currently held locks.  conflictTable[lockmode] has a bit set for
798          * each type of lock that conflicts with request.       Bitwise compare
799          * tells if there is a conflict.
800          */
801         if (!(lockctl->conflictTab[lockmode] & lock->grantMask))
802         {
803                 HOLDER_PRINT("LockCheckConflicts: no conflict", holder);
804                 return STATUS_OK;
805         }
806
807         /*
808          * Rats.  Something conflicts. But it could still be my own lock.  We
809          * have to construct a conflict mask that does not reflect our own
810          * locks.  Locks held by the current process under another XID also
811          * count as "our own locks".
812          */
813         if (myHolding == NULL)
814         {
815                 /* Caller didn't do calculation of total holding for me */
816                 LockCountMyLocks(holder->tag.lock, proc, localHolding);
817                 myHolding = localHolding;
818         }
819
820         /* Compute mask of lock types held by other processes */
821         bitmask = 0;
822         tmpMask = 2;
823         for (i = 1; i <= numLockModes; i++, tmpMask <<= 1)
824         {
825                 if (lock->granted[i] != myHolding[i])
826                         bitmask |= tmpMask;
827         }
828
829         /*
830          * now check again for conflicts.  'bitmask' describes the types of
831          * locks held by other processes.  If one of these conflicts with the
832          * kind of lock that I want, there is a conflict and I have to sleep.
833          */
834         if (!(lockctl->conflictTab[lockmode] & bitmask))
835         {
836                 /* no conflict. OK to get the lock */
837                 HOLDER_PRINT("LockCheckConflicts: resolved", holder);
838                 return STATUS_OK;
839         }
840
841         HOLDER_PRINT("LockCheckConflicts: conflicting", holder);
842         return STATUS_FOUND;
843 }
844
845 /*
846  * LockCountMyLocks --- Count total number of locks held on a given lockable
847  *              object by a given process (under any transaction ID).
848  *
849  * XXX This could be rather slow if the process holds a large number of locks.
850  * Perhaps it could be sped up if we kept yet a third hashtable of per-
851  * process lock information.  However, for the normal case where a transaction
852  * doesn't hold a large number of locks, keeping such a table would probably
853  * be a net slowdown.
854  */
855 static void
856 LockCountMyLocks(SHMEM_OFFSET lockOffset, PROC *proc, int *myHolding)
857 {
858         SHM_QUEUE  *procHolders = &(proc->procHolders);
859         HOLDER     *holder;
860         int                     i;
861
862         MemSet(myHolding, 0, MAX_LOCKMODES * sizeof(int));
863
864         holder = (HOLDER *) SHMQueueNext(procHolders, procHolders,
865                                                                          offsetof(HOLDER, procLink));
866
867         while (holder)
868         {
869                 if (lockOffset == holder->tag.lock)
870                 {
871                         for (i = 1; i < MAX_LOCKMODES; i++)
872                                 myHolding[i] += holder->holding[i];
873                 }
874
875                 holder = (HOLDER *) SHMQueueNext(procHolders, &holder->procLink,
876                                                                                  offsetof(HOLDER, procLink));
877         }
878 }
879
880 /*
881  * GrantLock -- update the lock and holder data structures to show
882  *              the lock request has been granted.
883  *
884  * NOTE: if proc was blocked, it also needs to be removed from the wait list
885  * and have its waitLock/waitHolder fields cleared.  That's not done here.
886  */
887 void
888 GrantLock(LOCK *lock, HOLDER *holder, LOCKMODE lockmode)
889 {
890         lock->nGranted++;
891         lock->granted[lockmode]++;
892         lock->grantMask |= BITS_ON[lockmode];
893         if (lock->granted[lockmode] == lock->requested[lockmode])
894                 lock->waitMask &= BITS_OFF[lockmode];
895         LOCK_PRINT("GrantLock", lock, lockmode);
896         Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
897         Assert(lock->nGranted <= lock->nRequested);
898         holder->holding[lockmode]++;
899         holder->nHolding++;
900         Assert((holder->nHolding > 0) && (holder->holding[lockmode] > 0));
901 }
902
903 /*
904  * WaitOnLock -- wait to acquire a lock
905  *
906  * Caller must have set MyProc->heldLocks to reflect locks already held
907  * on the lockable object by this process (under all XIDs).
908  *
909  * The locktable spinlock must be held at entry.
910  */
911 static int
912 WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode,
913                    LOCK *lock, HOLDER *holder)
914 {
915         LOCKMETHODTABLE *lockMethodTable = LockMethodTable[lockmethod];
916         char       *new_status,
917                            *old_status;
918
919         Assert(lockmethod < NumLockMethods);
920
921         LOCK_PRINT("WaitOnLock: sleeping on lock", lock, lockmode);
922
923         old_status = pstrdup(get_ps_display());
924         new_status = (char *) palloc(strlen(old_status) + 10);
925         strcpy(new_status, old_status);
926         strcat(new_status, " waiting");
927         set_ps_display(new_status);
928
929         /*
930          * NOTE: Think not to put any shared-state cleanup after the call to
931          * ProcSleep, in either the normal or failure path.  The lock state
932          * must be fully set by the lock grantor, or by HandleDeadLock if we
933          * give up waiting for the lock.  This is necessary because of the
934          * possibility that a cancel/die interrupt will interrupt ProcSleep
935          * after someone else grants us the lock, but before we've noticed it.
936          * Hence, after granting, the locktable state must fully reflect the
937          * fact that we own the lock; we can't do additional work on return.
938          * Contrariwise, if we fail, any cleanup must happen in xact abort
939          * processing, not here, to ensure it will also happen in the
940          * cancel/die case.
941          */
942
943         if (ProcSleep(lockMethodTable,
944                                   lockmode,
945                                   lock,
946                                   holder) != STATUS_OK)
947         {
948
949                 /*
950                  * We failed as a result of a deadlock, see HandleDeadLock(). Quit
951                  * now.  Removal of the holder and lock objects, if no longer
952                  * needed, will happen in xact cleanup (see above for motivation).
953                  */
954                 LOCK_PRINT("WaitOnLock: aborting on lock", lock, lockmode);
955                 SpinRelease(lockMethodTable->ctl->masterLock);
956                 elog(ERROR, DeadLockMessage);
957                 /* not reached */
958         }
959
960         set_ps_display(old_status);
961         pfree(old_status);
962         pfree(new_status);
963
964         LOCK_PRINT("WaitOnLock: wakeup on lock", lock, lockmode);
965         return STATUS_OK;
966 }
967
968 /*
969  * Remove a proc from the wait-queue it is on
970  * (caller must know it is on one).
971  *
972  * Locktable lock must be held by caller.
973  *
974  * NB: this does not remove the process' holder object, nor the lock object,
975  * even though their counts might now have gone to zero.  That will happen
976  * during a subsequent LockReleaseAll call, which we expect will happen
977  * during transaction cleanup.  (Removal of a proc from its wait queue by
978  * this routine can only happen if we are aborting the transaction.)
979  */
980 void
981 RemoveFromWaitQueue(PROC *proc)
982 {
983         LOCK       *waitLock = proc->waitLock;
984         LOCKMODE        lockmode = proc->waitLockMode;
985
986         /* Make sure proc is waiting */
987         Assert(proc->links.next != INVALID_OFFSET);
988         Assert(waitLock);
989         Assert(waitLock->waitProcs.size > 0);
990
991         /* Remove proc from lock's wait queue */
992         SHMQueueDelete(&(proc->links));
993         waitLock->waitProcs.size--;
994
995         /* Undo increments of request counts by waiting process */
996         Assert(waitLock->nRequested > 0);
997         Assert(waitLock->nRequested > proc->waitLock->nGranted);
998         waitLock->nRequested--;
999         Assert(waitLock->requested[lockmode] > 0);
1000         waitLock->requested[lockmode]--;
1001         /* don't forget to clear waitMask bit if appropriate */
1002         if (waitLock->granted[lockmode] == waitLock->requested[lockmode])
1003                 waitLock->waitMask &= BITS_OFF[lockmode];
1004
1005         /* Clean up the proc's own state */
1006         proc->waitLock = NULL;
1007         proc->waitHolder = NULL;
1008
1009         /* See if any other waiters for the lock can be woken up now */
1010         ProcLockWakeup(GetLocksMethodTable(waitLock), waitLock);
1011 }
1012
1013 /*
1014  * LockRelease -- look up 'locktag' in lock table 'lockmethod' and
1015  *              release one 'lockmode' lock on it.
1016  *
1017  * Side Effects: find any waiting processes that are now wakable,
1018  *              grant them their requested locks and awaken them.
1019  *              (We have to grant the lock here to avoid a race between
1020  *              the waking process and any new process to
1021  *              come along and request the lock.)
1022  */
1023 bool
1024 LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag,
1025                         TransactionId xid, LOCKMODE lockmode)
1026 {
1027         LOCK       *lock;
1028         SPINLOCK        masterLock;
1029         bool            found;
1030         LOCKMETHODTABLE *lockMethodTable;
1031         HOLDER     *holder;
1032         HOLDERTAG       holdertag;
1033         HTAB       *holderTable;
1034         bool            wakeupNeeded = false;
1035
1036 #ifdef LOCK_DEBUG
1037         if (lockmethod == USER_LOCKMETHOD && Trace_userlocks)
1038                 elog(DEBUG, "LockRelease: user lock tag [%u] %d", locktag->objId.blkno, lockmode);
1039 #endif
1040
1041         /* ???????? This must be changed when short term locks will be used */
1042         locktag->lockmethod = lockmethod;
1043
1044         Assert(lockmethod < NumLockMethods);
1045         lockMethodTable = LockMethodTable[lockmethod];
1046         if (!lockMethodTable)
1047         {
1048                 elog(NOTICE, "lockMethodTable is null in LockRelease");
1049                 return FALSE;
1050         }
1051
1052         if (LockingIsDisabled)
1053                 return TRUE;
1054
1055         masterLock = lockMethodTable->ctl->masterLock;
1056         SpinAcquire(masterLock);
1057
1058         /*
1059          * Find a lock with this tag
1060          */
1061         Assert(lockMethodTable->lockHash->hash == tag_hash);
1062         lock = (LOCK *) hash_search(lockMethodTable->lockHash, (Pointer) locktag,
1063                                                                 HASH_FIND, &found);
1064
1065         /*
1066          * let the caller print its own error message, too. Do not
1067          * elog(ERROR).
1068          */
1069         if (!lock)
1070         {
1071                 SpinRelease(masterLock);
1072                 elog(NOTICE, "LockRelease: locktable corrupted");
1073                 return FALSE;
1074         }
1075
1076         if (!found)
1077         {
1078                 SpinRelease(masterLock);
1079                 elog(NOTICE, "LockRelease: no such lock");
1080                 return FALSE;
1081         }
1082         LOCK_PRINT("LockRelease: found", lock, lockmode);
1083
1084         /*
1085          * Find the holder entry for this holder.
1086          */
1087         MemSet(&holdertag, 0, sizeof(HOLDERTAG));       /* must clear padding,
1088                                                                                                  * needed */
1089         holdertag.lock = MAKE_OFFSET(lock);
1090         holdertag.proc = MAKE_OFFSET(MyProc);
1091         TransactionIdStore(xid, &holdertag.xid);
1092
1093         holderTable = lockMethodTable->holderHash;
1094         holder = (HOLDER *) hash_search(holderTable, (Pointer) &holdertag,
1095                                                                         HASH_FIND_SAVE, &found);
1096         if (!holder || !found)
1097         {
1098                 SpinRelease(masterLock);
1099 #ifdef USER_LOCKS
1100                 if (!found && lockmethod == USER_LOCKMETHOD)
1101                         elog(NOTICE, "LockRelease: no lock with this tag");
1102                 else
1103 #endif
1104                         elog(NOTICE, "LockRelease: holder table corrupted");
1105                 return FALSE;
1106         }
1107         HOLDER_PRINT("LockRelease: found", holder);
1108
1109         /*
1110          * Check that we are actually holding a lock of the type we want to
1111          * release.
1112          */
1113         if (!(holder->holding[lockmode] > 0))
1114         {
1115                 HOLDER_PRINT("LockRelease: WRONGTYPE", holder);
1116                 Assert(holder->holding[lockmode] >= 0);
1117                 SpinRelease(masterLock);
1118                 elog(NOTICE, "LockRelease: you don't own a lock of type %s",
1119                          lock_mode_names[lockmode]);
1120                 return FALSE;
1121         }
1122         Assert(holder->nHolding > 0);
1123         Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
1124         Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
1125         Assert(lock->nGranted <= lock->nRequested);
1126
1127         /*
1128          * fix the general lock stats
1129          */
1130         lock->nRequested--;
1131         lock->requested[lockmode]--;
1132         lock->nGranted--;
1133         lock->granted[lockmode]--;
1134
1135         if (lock->granted[lockmode] == 0)
1136         {
1137                 /* change the conflict mask.  No more of this lock type. */
1138                 lock->grantMask &= BITS_OFF[lockmode];
1139         }
1140
1141         LOCK_PRINT("LockRelease: updated", lock, lockmode);
1142         Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
1143         Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
1144         Assert(lock->nGranted <= lock->nRequested);
1145
1146         /*
1147          * We need only run ProcLockWakeup if the released lock conflicts with
1148          * at least one of the lock types requested by waiter(s).  Otherwise
1149          * whatever conflict made them wait must still exist.  NOTE: before
1150          * MVCC, we could skip wakeup if lock->granted[lockmode] was still
1151          * positive. But that's not true anymore, because the remaining
1152          * granted locks might belong to some waiter, who could now be
1153          * awakened because he doesn't conflict with his own locks.
1154          */
1155         if (lockMethodTable->ctl->conflictTab[lockmode] & lock->waitMask)
1156                 wakeupNeeded = true;
1157
1158         if (lock->nRequested == 0)
1159         {
1160
1161                 /*
1162                  * if there's no one waiting in the queue, we just released the
1163                  * last lock on this object. Delete it from the lock table.
1164                  */
1165                 Assert(lockMethodTable->lockHash->hash == tag_hash);
1166                 lock = (LOCK *) hash_search(lockMethodTable->lockHash,
1167                                                                         (Pointer) &(lock->tag),
1168                                                                         HASH_REMOVE,
1169                                                                         &found);
1170                 if (!lock || !found)
1171                 {
1172                         SpinRelease(masterLock);
1173                         elog(NOTICE, "LockRelease: remove lock, table corrupted");
1174                         return FALSE;
1175                 }
1176                 wakeupNeeded = false;   /* should be false, but make sure */
1177         }
1178
1179         /*
1180          * Now fix the per-holder lock stats.
1181          */
1182         holder->holding[lockmode]--;
1183         holder->nHolding--;
1184         HOLDER_PRINT("LockRelease: updated", holder);
1185         Assert((holder->nHolding >= 0) && (holder->holding[lockmode] >= 0));
1186
1187         /*
1188          * If this was my last hold on this lock, delete my entry in the
1189          * holder table.
1190          */
1191         if (holder->nHolding == 0)
1192         {
1193                 HOLDER_PRINT("LockRelease: deleting", holder);
1194                 SHMQueueDelete(&holder->lockLink);
1195                 SHMQueueDelete(&holder->procLink);
1196                 holder = (HOLDER *) hash_search(holderTable, (Pointer) &holder,
1197                                                                                 HASH_REMOVE_SAVED, &found);
1198                 if (!holder || !found)
1199                 {
1200                         SpinRelease(masterLock);
1201                         elog(NOTICE, "LockRelease: remove holder, table corrupted");
1202                         return FALSE;
1203                 }
1204         }
1205
1206         /*
1207          * Wake up waiters if needed.
1208          */
1209         if (wakeupNeeded)
1210                 ProcLockWakeup(lockMethodTable, lock);
1211
1212         SpinRelease(masterLock);
1213         return TRUE;
1214 }
1215
1216 /*
1217  * LockReleaseAll -- Release all locks in a process's lock list.
1218  *
1219  * Well, not really *all* locks.
1220  *
1221  * If 'allxids' is TRUE, all locks of the specified lock method are
1222  * released, regardless of transaction affiliation.
1223  *
1224  * If 'allxids' is FALSE, all locks of the specified lock method and
1225  * specified XID are released.
1226  */
1227 bool
1228 LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc,
1229                            bool allxids, TransactionId xid)
1230 {
1231         SHM_QUEUE  *procHolders = &(proc->procHolders);
1232         HOLDER     *holder;
1233         HOLDER     *nextHolder;
1234         SPINLOCK        masterLock;
1235         LOCKMETHODTABLE *lockMethodTable;
1236         int                     i,
1237                                 numLockModes;
1238         LOCK       *lock;
1239         bool            found;
1240
1241 #ifdef LOCK_DEBUG
1242         if (lockmethod == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
1243                 elog(DEBUG, "LockReleaseAll: lockmethod=%d, pid=%d",
1244                          lockmethod, proc->pid);
1245 #endif
1246
1247         Assert(lockmethod < NumLockMethods);
1248         lockMethodTable = LockMethodTable[lockmethod];
1249         if (!lockMethodTable)
1250         {
1251                 elog(NOTICE, "LockReleaseAll: bad lockmethod %d", lockmethod);
1252                 return FALSE;
1253         }
1254
1255         numLockModes = lockMethodTable->ctl->numLockModes;
1256         masterLock = lockMethodTable->ctl->masterLock;
1257
1258         SpinAcquire(masterLock);
1259
1260         holder = (HOLDER *) SHMQueueNext(procHolders, procHolders,
1261                                                                          offsetof(HOLDER, procLink));
1262
1263         while (holder)
1264         {
1265                 bool            wakeupNeeded = false;
1266
1267                 /* Get link first, since we may unlink/delete this holder */
1268                 nextHolder = (HOLDER *) SHMQueueNext(procHolders, &holder->procLink,
1269                                                                                          offsetof(HOLDER, procLink));
1270
1271                 Assert(holder->tag.proc == MAKE_OFFSET(proc));
1272
1273                 lock = (LOCK *) MAKE_PTR(holder->tag.lock);
1274
1275                 /* Ignore items that are not of the lockmethod to be removed */
1276                 if (LOCK_LOCKMETHOD(*lock) != lockmethod)
1277                         goto next_item;
1278
1279                 /* If not allxids, ignore items that are of the wrong xid */
1280                 if (!allxids && !TransactionIdEquals(xid, holder->tag.xid))
1281                         goto next_item;
1282
1283                 HOLDER_PRINT("LockReleaseAll", holder);
1284                 LOCK_PRINT("LockReleaseAll", lock, 0);
1285                 Assert(lock->nRequested >= 0);
1286                 Assert(lock->nGranted >= 0);
1287                 Assert(lock->nGranted <= lock->nRequested);
1288                 Assert(holder->nHolding >= 0);
1289                 Assert(holder->nHolding <= lock->nRequested);
1290
1291                 /*
1292                  * fix the general lock stats
1293                  */
1294                 if (lock->nRequested != holder->nHolding)
1295                 {
1296                         for (i = 1; i <= numLockModes; i++)
1297                         {
1298                                 Assert(holder->holding[i] >= 0);
1299                                 if (holder->holding[i] > 0)
1300                                 {
1301                                         lock->requested[i] -= holder->holding[i];
1302                                         lock->granted[i] -= holder->holding[i];
1303                                         Assert(lock->requested[i] >= 0 && lock->granted[i] >= 0);
1304                                         if (lock->granted[i] == 0)
1305                                                 lock->grantMask &= BITS_OFF[i];
1306
1307                                         /*
1308                                          * Read comments in LockRelease
1309                                          */
1310                                         if (!wakeupNeeded &&
1311                                         lockMethodTable->ctl->conflictTab[i] & lock->waitMask)
1312                                                 wakeupNeeded = true;
1313                                 }
1314                         }
1315                         lock->nRequested -= holder->nHolding;
1316                         lock->nGranted -= holder->nHolding;
1317                         Assert((lock->nRequested >= 0) && (lock->nGranted >= 0));
1318                         Assert(lock->nGranted <= lock->nRequested);
1319                 }
1320                 else
1321                 {
1322                         /*
1323                          * This holder accounts for all the requested locks on the
1324                          * object, so we can be lazy and just zero things out.
1325                          */
1326                         lock->nRequested = 0;
1327                         lock->nGranted = 0;
1328                         /* Fix the lock status, just for next LOCK_PRINT message. */
1329                         for (i = 1; i <= numLockModes; i++)
1330                         {
1331                                 Assert(lock->requested[i] == lock->granted[i]);
1332                                 lock->requested[i] = lock->granted[i] = 0;
1333                         }
1334                 }
1335                 LOCK_PRINT("LockReleaseAll: updated", lock, 0);
1336
1337                 HOLDER_PRINT("LockReleaseAll: deleting", holder);
1338
1339                 /*
1340                  * Remove the holder entry from the linked lists
1341                  */
1342                 SHMQueueDelete(&holder->lockLink);
1343                 SHMQueueDelete(&holder->procLink);
1344
1345                 /*
1346                  * remove the holder entry from the hashtable
1347                  */
1348                 holder = (HOLDER *) hash_search(lockMethodTable->holderHash,
1349                                                                                 (Pointer) holder,
1350                                                                                 HASH_REMOVE,
1351                                                                                 &found);
1352                 if (!holder || !found)
1353                 {
1354                         SpinRelease(masterLock);
1355                         elog(NOTICE, "LockReleaseAll: holder table corrupted");
1356                         return FALSE;
1357                 }
1358
1359                 if (lock->nRequested == 0)
1360                 {
1361
1362                         /*
1363                          * We've just released the last lock, so garbage-collect the
1364                          * lock object.
1365                          */
1366                         LOCK_PRINT("LockReleaseAll: deleting", lock, 0);
1367                         Assert(lockMethodTable->lockHash->hash == tag_hash);
1368                         lock = (LOCK *) hash_search(lockMethodTable->lockHash,
1369                                                                                 (Pointer) &(lock->tag),
1370                                                                                 HASH_REMOVE, &found);
1371                         if (!lock || !found)
1372                         {
1373                                 SpinRelease(masterLock);
1374                                 elog(NOTICE, "LockReleaseAll: cannot remove lock from HTAB");
1375                                 return FALSE;
1376                         }
1377                 }
1378                 else if (wakeupNeeded)
1379                         ProcLockWakeup(lockMethodTable, lock);
1380
1381 next_item:
1382                 holder = nextHolder;
1383         }
1384
1385         SpinRelease(masterLock);
1386
1387 #ifdef LOCK_DEBUG
1388         if (lockmethod == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
1389                 elog(DEBUG, "LockReleaseAll: done");
1390 #endif
1391
1392         return TRUE;
1393 }
1394
1395 int
1396 LockShmemSize(int maxBackends)
1397 {
1398         int                     size = 0;
1399         long            max_table_size = NLOCKENTS(maxBackends);
1400
1401         size += MAXALIGN(sizeof(PROC_HDR)); /* ProcGlobal */
1402         size += maxBackends * MAXALIGN(sizeof(PROC));           /* each MyProc */
1403         size += MAX_LOCK_METHODS * MAXALIGN(sizeof(LOCKMETHODCTL)); /* each
1404                                                                                                                                  * lockMethodTable->ctl */
1405
1406         /* lockHash table */
1407         size += hash_estimate_size(max_table_size,
1408                                                            SHMEM_LOCKTAB_KEYSIZE,
1409                                                            SHMEM_LOCKTAB_DATASIZE);
1410
1411         /* holderHash table */
1412         size += hash_estimate_size(max_table_size,
1413                                                            SHMEM_HOLDERTAB_KEYSIZE,
1414                                                            SHMEM_HOLDERTAB_DATASIZE);
1415
1416         /*
1417          * Since the lockHash entry count above is only an estimate, add 10%
1418          * safety margin.
1419          */
1420         size += size / 10;
1421
1422         return size;
1423 }
1424
1425
1426 #ifdef LOCK_DEBUG
1427 /*
1428  * Dump all locks in the proc->procHolders list.
1429  *
1430  * Must have already acquired the masterLock.
1431  */
1432 void
1433 DumpLocks(void)
1434 {
1435         SHMEM_OFFSET location;
1436         PROC       *proc;
1437         SHM_QUEUE  *procHolders;
1438         HOLDER     *holder;
1439         LOCK       *lock;
1440         int                     lockmethod = DEFAULT_LOCKMETHOD;
1441         LOCKMETHODTABLE *lockMethodTable;
1442
1443         ShmemPIDLookup(MyProcPid, &location);
1444         if (location == INVALID_OFFSET)
1445                 return;
1446         proc = (PROC *) MAKE_PTR(location);
1447         if (proc != MyProc)
1448                 return;
1449         procHolders = &proc->procHolders;
1450
1451         Assert(lockmethod < NumLockMethods);
1452         lockMethodTable = LockMethodTable[lockmethod];
1453         if (!lockMethodTable)
1454                 return;
1455
1456         if (proc->waitLock)
1457                 LOCK_PRINT("DumpLocks: waiting on", proc->waitLock, 0);
1458
1459         holder = (HOLDER *) SHMQueueNext(procHolders, procHolders,
1460                                                                          offsetof(HOLDER, procLink));
1461
1462         while (holder)
1463         {
1464                 Assert(holder->tag.proc == MAKE_OFFSET(proc));
1465
1466                 lock = (LOCK *) MAKE_PTR(holder->tag.lock);
1467
1468                 HOLDER_PRINT("DumpLocks", holder);
1469                 LOCK_PRINT("DumpLocks", lock, 0);
1470
1471                 holder = (HOLDER *) SHMQueueNext(procHolders, &holder->procLink,
1472                                                                                  offsetof(HOLDER, procLink));
1473         }
1474 }
1475
1476 /*
1477  * Dump all postgres locks. Must have already acquired the masterLock.
1478  */
1479 void
1480 DumpAllLocks(void)
1481 {
1482         SHMEM_OFFSET location;
1483         PROC       *proc;
1484         HOLDER     *holder = NULL;
1485         LOCK       *lock;
1486         int                     pid;
1487         int                     lockmethod = DEFAULT_LOCKMETHOD;
1488         LOCKMETHODTABLE *lockMethodTable;
1489         HTAB       *holderTable;
1490         HASH_SEQ_STATUS status;
1491
1492         pid = getpid();
1493         ShmemPIDLookup(pid, &location);
1494         if (location == INVALID_OFFSET)
1495                 return;
1496         proc = (PROC *) MAKE_PTR(location);
1497         if (proc != MyProc)
1498                 return;
1499
1500         Assert(lockmethod < NumLockMethods);
1501         lockMethodTable = LockMethodTable[lockmethod];
1502         if (!lockMethodTable)
1503                 return;
1504
1505         holderTable = lockMethodTable->holderHash;
1506
1507         if (proc->waitLock)
1508                 LOCK_PRINT("DumpAllLocks: waiting on", proc->waitLock, 0);
1509
1510         hash_seq_init(&status, holderTable);
1511         while ((holder = (HOLDER *) hash_seq_search(&status)) &&
1512                    (holder != (HOLDER *) TRUE))
1513         {
1514                 HOLDER_PRINT("DumpAllLocks", holder);
1515
1516                 if (holder->tag.lock)
1517                 {
1518                         lock = (LOCK *) MAKE_PTR(holder->tag.lock);
1519                         LOCK_PRINT("DumpAllLocks", lock, 0);
1520                 }
1521                 else
1522                         elog(DEBUG, "DumpAllLocks: holder->tag.lock = NULL");
1523         }
1524 }
1525
1526 #endif   /* LOCK_DEBUG */