]> granicus.if.org Git - postgresql/blob - src/backend/storage/lmgr/lock.c
Install the SQL command man pages into a section appropriate for each
[postgresql] / src / backend / storage / lmgr / lock.c
1 /*-------------------------------------------------------------------------
2  *
3  * lock.c
4  *        POSTGRES low-level lock mechanism
5  *
6  * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.93 2001/08/29 19:14:39 petere Exp $
12  *
13  * NOTES
14  *        Outside modules can create a lock table and acquire/release
15  *        locks.  A lock table is a shared memory hash table.  When
16  *        a process tries to acquire a lock of a type that conflicts
17  *        with existing locks, it is put to sleep using the routines
18  *        in storage/lmgr/proc.c.
19  *
20  *        For the most part, this code should be invoked via lmgr.c
21  *        or another lock-management module, not directly.
22  *
23  *      Interface:
24  *
25  *      LockAcquire(), LockRelease(), LockMethodTableInit(),
26  *      LockMethodTableRename(), LockReleaseAll,
27  *      LockCheckConflicts(), GrantLock()
28  *
29  *-------------------------------------------------------------------------
30  */
31 #include "postgres.h"
32
33 #include <sys/types.h>
34 #include <unistd.h>
35 #include <signal.h>
36
37 #include "access/xact.h"
38 #include "miscadmin.h"
39 #include "storage/proc.h"
40 #include "utils/memutils.h"
41 #include "utils/ps_status.h"
42
43
44 /* This configuration variable is used to set the lock table size */
45 int             max_locks_per_xact;             /* set by guc.c */
46
47 #define NLOCKENTS(maxBackends)  (max_locks_per_xact * (maxBackends))
48
49
50 static int WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode,
51                    LOCK *lock, HOLDER *holder);
52 static void LockCountMyLocks(SHMEM_OFFSET lockOffset, PROC *proc,
53                                  int *myHolding);
54
55 static char *lock_mode_names[] =
56 {
57         "INVALID",
58         "AccessShareLock",
59         "RowShareLock",
60         "RowExclusiveLock",
61         "ShareUpdateExclusiveLock",
62         "ShareLock",
63         "ShareRowExclusiveLock",
64         "ExclusiveLock",
65         "AccessExclusiveLock"
66 };
67
68
69 #ifdef LOCK_DEBUG
70
71 /*------
72  * The following configuration options are available for lock debugging:
73  *
74  *         TRACE_LOCKS          -- give a bunch of output what's going on in this file
75  *         TRACE_USERLOCKS      -- same but for user locks
76  *         TRACE_LOCK_OIDMIN-- do not trace locks for tables below this oid
77  *                                                 (use to avoid output on system tables)
78  *         TRACE_LOCK_TABLE -- trace locks on this table (oid) unconditionally
79  *         DEBUG_DEADLOCKS      -- currently dumps locks at untimely occasions ;)
80  *
81  * Furthermore, but in storage/ipc/spin.c:
82  *         TRACE_SPINLOCKS      -- trace spinlocks (pretty useless)
83  *
84  * Define LOCK_DEBUG at compile time to get all these enabled.
85  * --------
86  */
87
88 int                     Trace_lock_oidmin = BootstrapObjectIdData;
89 bool            Trace_locks = false;
90 bool            Trace_userlocks = false;
91 int                     Trace_lock_table = 0;
92 bool            Debug_deadlocks = false;
93
94
95 inline static bool
96 LOCK_DEBUG_ENABLED(const LOCK *lock)
97 {
98         return
99         (((LOCK_LOCKMETHOD(*lock) == DEFAULT_LOCKMETHOD && Trace_locks)
100           || (LOCK_LOCKMETHOD(*lock) == USER_LOCKMETHOD && Trace_userlocks))
101          && (lock->tag.relId >= (Oid) Trace_lock_oidmin))
102         || (Trace_lock_table && (lock->tag.relId == Trace_lock_table));
103 }
104
105
106 inline static void
107 LOCK_PRINT(const char *where, const LOCK *lock, LOCKMODE type)
108 {
109         if (LOCK_DEBUG_ENABLED(lock))
110                 elog(DEBUG,
111                          "%s: lock(%lx) tbl(%d) rel(%u) db(%u) obj(%u) grantMask(%x) "
112                          "req(%d,%d,%d,%d,%d,%d,%d)=%d "
113                          "grant(%d,%d,%d,%d,%d,%d,%d)=%d wait(%d) type(%s)",
114                          where, MAKE_OFFSET(lock),
115                          lock->tag.lockmethod, lock->tag.relId, lock->tag.dbId,
116                          lock->tag.objId.blkno, lock->grantMask,
117                          lock->requested[1], lock->requested[2], lock->requested[3],
118                          lock->requested[4], lock->requested[5], lock->requested[6],
119                          lock->requested[7], lock->nRequested,
120                          lock->granted[1], lock->granted[2], lock->granted[3],
121                          lock->granted[4], lock->granted[5], lock->granted[6],
122                          lock->granted[7], lock->nGranted,
123                          lock->waitProcs.size, lock_mode_names[type]);
124 }
125
126
127 inline static void
128 HOLDER_PRINT(const char *where, const HOLDER *holderP)
129 {
130         if (
131          (((HOLDER_LOCKMETHOD(*holderP) == DEFAULT_LOCKMETHOD && Trace_locks)
132            || (HOLDER_LOCKMETHOD(*holderP) == USER_LOCKMETHOD && Trace_userlocks))
133           && (((LOCK *) MAKE_PTR(holderP->tag.lock))->tag.relId >= (Oid) Trace_lock_oidmin))
134                 || (Trace_lock_table && (((LOCK *) MAKE_PTR(holderP->tag.lock))->tag.relId == Trace_lock_table))
135         )
136                 elog(DEBUG,
137                          "%s: holder(%lx) lock(%lx) tbl(%d) proc(%lx) xid(%u) hold(%d,%d,%d,%d,%d,%d,%d)=%d",
138                          where, MAKE_OFFSET(holderP), holderP->tag.lock,
139                          HOLDER_LOCKMETHOD(*(holderP)),
140                          holderP->tag.proc, holderP->tag.xid,
141                    holderP->holding[1], holderP->holding[2], holderP->holding[3],
142                    holderP->holding[4], holderP->holding[5], holderP->holding[6],
143                          holderP->holding[7], holderP->nHolding);
144 }
145
146 #else                                                   /* not LOCK_DEBUG */
147
148 #define LOCK_PRINT(where, lock, type)
149 #define HOLDER_PRINT(where, holderP)
150
151 #endif   /* not LOCK_DEBUG */
152
153
154
155 SPINLOCK        LockMgrLock;            /* in Shmem or created in
156                                                                  * CreateSpinlocks() */
157
158 /*
159  * These are to simplify/speed up some bit arithmetic.
160  *
161  * XXX is a fetch from a static array really faster than a shift?
162  * Wouldn't bet on it...
163  */
164
165 static LOCKMASK BITS_OFF[MAX_LOCKMODES];
166 static LOCKMASK BITS_ON[MAX_LOCKMODES];
167
168 /*
169  * Disable flag
170  */
171 static bool LockingIsDisabled;
172
173 /*
174  * map from lockmethod to the lock table structure
175  */
176 static LOCKMETHODTABLE *LockMethodTable[MAX_LOCK_METHODS];
177
178 static int      NumLockMethods;
179
180 /*
181  * InitLocks -- Init the lock module.  Create a private data
182  *              structure for constructing conflict masks.
183  */
184 void
185 InitLocks(void)
186 {
187         int                     i;
188         int                     bit;
189
190         bit = 1;
191         for (i = 0; i < MAX_LOCKMODES; i++, bit <<= 1)
192         {
193                 BITS_ON[i] = bit;
194                 BITS_OFF[i] = ~bit;
195         }
196 }
197
198 /*
199  * LockDisable -- sets LockingIsDisabled flag to TRUE or FALSE.
200  */
201 void
202 LockDisable(bool status)
203 {
204         LockingIsDisabled = status;
205 }
206
207 /*
208  * Boolean function to determine current locking status
209  */
210 bool
211 LockingDisabled(void)
212 {
213         return LockingIsDisabled;
214 }
215
216 /*
217  * Fetch the lock method table associated with a given lock
218  */
219 LOCKMETHODTABLE *
220 GetLocksMethodTable(LOCK *lock)
221 {
222         LOCKMETHOD      lockmethod = LOCK_LOCKMETHOD(*lock);
223
224         Assert(lockmethod > 0 && lockmethod < NumLockMethods);
225         return LockMethodTable[lockmethod];
226 }
227
228
229 /*
230  * LockMethodInit -- initialize the lock table's lock type
231  *              structures
232  *
233  * Notes: just copying.  Should only be called once.
234  */
235 static void
236 LockMethodInit(LOCKMETHODTABLE *lockMethodTable,
237                            LOCKMASK *conflictsP,
238                            int *prioP,
239                            int numModes)
240 {
241         int                     i;
242
243         lockMethodTable->ctl->numLockModes = numModes;
244         numModes++;
245         for (i = 0; i < numModes; i++, prioP++, conflictsP++)
246         {
247                 lockMethodTable->ctl->conflictTab[i] = *conflictsP;
248                 lockMethodTable->ctl->prio[i] = *prioP;
249         }
250 }
251
252 /*
253  * LockMethodTableInit -- initialize a lock table structure
254  *
255  * Notes:
256  *              (a) a lock table has four separate entries in the shmem index
257  *              table.  This is because every shared hash table and spinlock
258  *              has its name stored in the shmem index at its creation.  It
259  *              is wasteful, in this case, but not much space is involved.
260  *
261  * NOTE: data structures allocated here are allocated permanently, using
262  * TopMemoryContext and shared memory.  We don't ever release them anyway,
263  * and in normal multi-backend operation the lock table structures set up
264  * by the postmaster are inherited by each backend, so they must be in
265  * TopMemoryContext.
266  */
267 LOCKMETHOD
268 LockMethodTableInit(char *tabName,
269                                         LOCKMASK *conflictsP,
270                                         int *prioP,
271                                         int numModes,
272                                         int maxBackends)
273 {
274         LOCKMETHODTABLE *lockMethodTable;
275         char       *shmemName;
276         HASHCTL         info;
277         int                     hash_flags;
278         bool            found;
279         long            init_table_size,
280                                 max_table_size;
281
282         if (numModes >= MAX_LOCKMODES)
283         {
284                 elog(NOTICE, "LockMethodTableInit: too many lock types %d greater than %d",
285                          numModes, MAX_LOCKMODES);
286                 return INVALID_LOCKMETHOD;
287         }
288
289         /* Compute init/max size to request for lock hashtables */
290         max_table_size = NLOCKENTS(maxBackends);
291         init_table_size = max_table_size / 10;
292
293         /* Allocate a string for the shmem index table lookups. */
294         /* This is just temp space in this routine, so palloc is OK. */
295         shmemName = (char *) palloc(strlen(tabName) + 32);
296
297         /* each lock table has a non-shared, permanent header */
298         lockMethodTable = (LOCKMETHODTABLE *)
299                 MemoryContextAlloc(TopMemoryContext, sizeof(LOCKMETHODTABLE));
300
301         /*
302          * find/acquire the spinlock for the table
303          */
304         SpinAcquire(LockMgrLock);
305
306         /*
307          * allocate a control structure from shared memory or attach to it if
308          * it already exists.
309          */
310         sprintf(shmemName, "%s (ctl)", tabName);
311         lockMethodTable->ctl = (LOCKMETHODCTL *)
312                 ShmemInitStruct(shmemName, sizeof(LOCKMETHODCTL), &found);
313
314         if (!lockMethodTable->ctl)
315                 elog(FATAL, "LockMethodTableInit: couldn't initialize %s", tabName);
316
317         /*
318          * no zero-th table
319          */
320         NumLockMethods = 1;
321
322         /*
323          * we're first - initialize
324          */
325         if (!found)
326         {
327                 MemSet(lockMethodTable->ctl, 0, sizeof(LOCKMETHODCTL));
328                 lockMethodTable->ctl->masterLock = LockMgrLock;
329                 lockMethodTable->ctl->lockmethod = NumLockMethods;
330         }
331
332         /*
333          * other modules refer to the lock table by a lockmethod ID
334          */
335         LockMethodTable[NumLockMethods] = lockMethodTable;
336         NumLockMethods++;
337         Assert(NumLockMethods <= MAX_LOCK_METHODS);
338
339         /*
340          * allocate a hash table for LOCK structs.      This is used to store
341          * per-locked-object information.
342          */
343         info.keysize = SHMEM_LOCKTAB_KEYSIZE;
344         info.datasize = SHMEM_LOCKTAB_DATASIZE;
345         info.hash = tag_hash;
346         hash_flags = (HASH_ELEM | HASH_FUNCTION);
347
348         sprintf(shmemName, "%s (lock hash)", tabName);
349         lockMethodTable->lockHash = ShmemInitHash(shmemName,
350                                                                                           init_table_size,
351                                                                                           max_table_size,
352                                                                                           &info,
353                                                                                           hash_flags);
354
355         if (!lockMethodTable->lockHash)
356                 elog(FATAL, "LockMethodTableInit: couldn't initialize %s", tabName);
357         Assert(lockMethodTable->lockHash->hash == tag_hash);
358
359         /*
360          * allocate a hash table for HOLDER structs.  This is used to store
361          * per-lock-holder information.
362          */
363         info.keysize = SHMEM_HOLDERTAB_KEYSIZE;
364         info.datasize = SHMEM_HOLDERTAB_DATASIZE;
365         info.hash = tag_hash;
366         hash_flags = (HASH_ELEM | HASH_FUNCTION);
367
368         sprintf(shmemName, "%s (holder hash)", tabName);
369         lockMethodTable->holderHash = ShmemInitHash(shmemName,
370                                                                                                 init_table_size,
371                                                                                                 max_table_size,
372                                                                                                 &info,
373                                                                                                 hash_flags);
374
375         if (!lockMethodTable->holderHash)
376                 elog(FATAL, "LockMethodTableInit: couldn't initialize %s", tabName);
377
378         /* init ctl data structures */
379         LockMethodInit(lockMethodTable, conflictsP, prioP, numModes);
380
381         SpinRelease(LockMgrLock);
382
383         pfree(shmemName);
384
385         return lockMethodTable->ctl->lockmethod;
386 }
387
388 /*
389  * LockMethodTableRename -- allocate another lockmethod ID to the same
390  *              lock table.
391  *
392  * NOTES: Both the lock module and the lock chain (lchain.c)
393  *              module use table id's to distinguish between different
394  *              kinds of locks.  Short term and long term locks look
395  *              the same to the lock table, but are handled differently
396  *              by the lock chain manager.      This function allows the
397  *              client to use different lockmethods when acquiring/releasing
398  *              short term and long term locks, yet store them all in one hashtable.
399  */
400
401 LOCKMETHOD
402 LockMethodTableRename(LOCKMETHOD lockmethod)
403 {
404         LOCKMETHOD      newLockMethod;
405
406         if (NumLockMethods >= MAX_LOCK_METHODS)
407                 return INVALID_LOCKMETHOD;
408         if (LockMethodTable[lockmethod] == INVALID_LOCKMETHOD)
409                 return INVALID_LOCKMETHOD;
410
411         /* other modules refer to the lock table by a lockmethod ID */
412         newLockMethod = NumLockMethods;
413         NumLockMethods++;
414
415         LockMethodTable[newLockMethod] = LockMethodTable[lockmethod];
416         return newLockMethod;
417 }
418
419 /*
420  * LockAcquire -- Check for lock conflicts, sleep if conflict found,
421  *              set lock if/when no conflicts.
422  *
423  * Returns: TRUE if lock was acquired, FALSE otherwise.  Note that
424  *              a FALSE return is to be expected if dontWait is TRUE;
425  *              but if dontWait is FALSE, only a parameter error can cause
426  *              a FALSE return.  (XXX probably we should just elog on parameter
427  *              errors, instead of conflating this with failure to acquire lock?)
428  *
429  * Side Effects: The lock is acquired and recorded in lock tables.
430  *
431  * NOTE: if we wait for the lock, there is no way to abort the wait
432  * short of aborting the transaction.
433  *
434  *
435  * Note on User Locks:
436  *
437  *              User locks are handled totally on the application side as
438  *              long term cooperative locks which extend beyond the normal
439  *              transaction boundaries.  Their purpose is to indicate to an
440  *              application that someone is `working' on an item.  So it is
441  *              possible to put an user lock on a tuple's oid, retrieve the
442  *              tuple, work on it for an hour and then update it and remove
443  *              the lock.  While the lock is active other clients can still
444  *              read and write the tuple but they can be aware that it has
445  *              been locked at the application level by someone.
446  *              User locks use lock tags made of an uint16 and an uint32, for
447  *              example 0 and a tuple oid, or any other arbitrary pair of
448  *              numbers following a convention established by the application.
449  *              In this sense tags don't refer to tuples or database entities.
450  *              User locks and normal locks are completely orthogonal and
451  *              they don't interfere with each other, so it is possible
452  *              to acquire a normal lock on an user-locked tuple or user-lock
453  *              a tuple for which a normal write lock already exists.
454  *              User locks are always non blocking, therefore they are never
455  *              acquired if already held by another process.  They must be
456  *              released explicitly by the application but they are released
457  *              automatically when a backend terminates.
458  *              They are indicated by a lockmethod 2 which is an alias for the
459  *              normal lock table, and are distinguished from normal locks
460  *              by the following differences:
461  *
462  *                                                                              normal lock             user lock
463  *
464  *              lockmethod                                              1                               2
465  *              tag.dbId                                                database oid    database oid
466  *              tag.relId                                               rel oid or 0    0
467  *              tag.objId                                               block id                lock id2
468  *                                                                              or xact id
469  *              tag.offnum                                              0                               lock id1
470  *              holder.xid                                              xid or 0                0
471  *              persistence                                             transaction             user or backend
472  *                                                                              or backend
473  *
474  *              The lockmode parameter can have the same values for normal locks
475  *              although probably only WRITE_LOCK can have some practical use.
476  *
477  *                                                                                                              DZ - 22 Nov 1997
478  */
479
480 bool
481 LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
482                         TransactionId xid, LOCKMODE lockmode, bool dontWait)
483 {
484         HOLDER     *holder;
485         HOLDERTAG       holdertag;
486         HTAB       *holderTable;
487         bool            found;
488         LOCK       *lock;
489         SPINLOCK        masterLock;
490         LOCKMETHODTABLE *lockMethodTable;
491         int                     status;
492         int                     myHolding[MAX_LOCKMODES];
493         int                     i;
494
495 #ifdef LOCK_DEBUG
496         if (lockmethod == USER_LOCKMETHOD && Trace_userlocks)
497                 elog(DEBUG, "LockAcquire: user lock [%u] %s",
498                          locktag->objId.blkno, lock_mode_names[lockmode]);
499 #endif
500
501         /* ???????? This must be changed when short term locks will be used */
502         locktag->lockmethod = lockmethod;
503
504         Assert(lockmethod < NumLockMethods);
505         lockMethodTable = LockMethodTable[lockmethod];
506         if (!lockMethodTable)
507         {
508                 elog(NOTICE, "LockAcquire: bad lock table %d", lockmethod);
509                 return FALSE;
510         }
511
512         if (LockingIsDisabled)
513                 return TRUE;
514
515         masterLock = lockMethodTable->ctl->masterLock;
516
517         SpinAcquire(masterLock);
518
519         /*
520          * Find or create a lock with this tag
521          */
522         Assert(lockMethodTable->lockHash->hash == tag_hash);
523         lock = (LOCK *) hash_search(lockMethodTable->lockHash, (Pointer) locktag,
524                                                                 HASH_ENTER, &found);
525         if (!lock)
526         {
527                 SpinRelease(masterLock);
528                 elog(FATAL, "LockAcquire: lock table %d is corrupted", lockmethod);
529                 return FALSE;
530         }
531
532         /*
533          * if it's a new lock object, initialize it
534          */
535         if (!found)
536         {
537                 lock->grantMask = 0;
538                 lock->waitMask = 0;
539                 SHMQueueInit(&(lock->lockHolders));
540                 ProcQueueInit(&(lock->waitProcs));
541                 lock->nRequested = 0;
542                 lock->nGranted = 0;
543                 MemSet((char *) lock->requested, 0, sizeof(int) * MAX_LOCKMODES);
544                 MemSet((char *) lock->granted, 0, sizeof(int) * MAX_LOCKMODES);
545                 LOCK_PRINT("LockAcquire: new", lock, lockmode);
546         }
547         else
548         {
549                 LOCK_PRINT("LockAcquire: found", lock, lockmode);
550                 Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
551                 Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
552                 Assert(lock->nGranted <= lock->nRequested);
553         }
554
555         /*
556          * Create the hash key for the holder table.
557          */
558         MemSet(&holdertag, 0, sizeof(HOLDERTAG));       /* must clear padding,
559                                                                                                  * needed */
560         holdertag.lock = MAKE_OFFSET(lock);
561         holdertag.proc = MAKE_OFFSET(MyProc);
562         TransactionIdStore(xid, &holdertag.xid);
563
564         /*
565          * Find or create a holder entry with this tag
566          */
567         holderTable = lockMethodTable->holderHash;
568         holder = (HOLDER *) hash_search(holderTable, (Pointer) &holdertag,
569                                                                         HASH_ENTER, &found);
570         if (!holder)
571         {
572                 SpinRelease(masterLock);
573                 elog(FATAL, "LockAcquire: holder table corrupted");
574                 return FALSE;
575         }
576
577         /*
578          * If new, initialize the new entry
579          */
580         if (!found)
581         {
582                 holder->nHolding = 0;
583                 MemSet((char *) holder->holding, 0, sizeof(int) * MAX_LOCKMODES);
584                 /* Add holder to appropriate lists */
585                 SHMQueueInsertBefore(&lock->lockHolders, &holder->lockLink);
586                 SHMQueueInsertBefore(&MyProc->procHolders, &holder->procLink);
587                 HOLDER_PRINT("LockAcquire: new", holder);
588         }
589         else
590         {
591                 HOLDER_PRINT("LockAcquire: found", holder);
592                 Assert((holder->nHolding >= 0) && (holder->holding[lockmode] >= 0));
593                 Assert(holder->nHolding <= lock->nGranted);
594
595 #ifdef CHECK_DEADLOCK_RISK
596
597                 /*
598                  * Issue warning if we already hold a lower-level lock on this
599                  * object and do not hold a lock of the requested level or higher.
600                  * This indicates a deadlock-prone coding practice (eg, we'd have
601                  * a deadlock if another backend were following the same code path
602                  * at about the same time).
603                  *
604                  * This is not enabled by default, because it may generate log
605                  * entries about user-level coding practices that are in fact safe
606                  * in context. It can be enabled to help find system-level
607                  * problems.
608                  *
609                  * XXX Doing numeric comparison on the lockmodes is a hack; it'd be
610                  * better to use a table.  For now, though, this works.
611                  */
612                 for (i = lockMethodTable->ctl->numLockModes; i > 0; i--)
613                 {
614                         if (holder->holding[i] > 0)
615                         {
616                                 if (i >= (int) lockmode)
617                                         break;          /* safe: we have a lock >= req level */
618                                 elog(DEBUG, "Deadlock risk: raising lock level"
619                                          " from %s to %s on object %u/%u/%u",
620                                          lock_mode_names[i], lock_mode_names[lockmode],
621                                  lock->tag.relId, lock->tag.dbId, lock->tag.objId.blkno);
622                                 break;
623                         }
624                 }
625 #endif   /* CHECK_DEADLOCK_RISK */
626         }
627
628         /*
629          * lock->nRequested and lock->requested[] count the total number of
630          * requests, whether granted or waiting, so increment those
631          * immediately. The other counts don't increment till we get the lock.
632          */
633         lock->nRequested++;
634         lock->requested[lockmode]++;
635         Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
636
637         /*
638          * If I already hold one or more locks of the requested type, just
639          * grant myself another one without blocking.
640          */
641         if (holder->holding[lockmode] > 0)
642         {
643                 GrantLock(lock, holder, lockmode);
644                 HOLDER_PRINT("LockAcquire: owning", holder);
645                 SpinRelease(masterLock);
646                 return TRUE;
647         }
648
649         /*
650          * If this process (under any XID) is a holder of the lock, also grant
651          * myself another one without blocking.
652          */
653         LockCountMyLocks(holder->tag.lock, MyProc, myHolding);
654         if (myHolding[lockmode] > 0)
655         {
656                 GrantLock(lock, holder, lockmode);
657                 HOLDER_PRINT("LockAcquire: my other XID owning", holder);
658                 SpinRelease(masterLock);
659                 return TRUE;
660         }
661
662         /*
663          * If lock requested conflicts with locks requested by waiters, must
664          * join wait queue.  Otherwise, check for conflict with already-held
665          * locks.  (That's last because most complex check.)
666          */
667         if (lockMethodTable->ctl->conflictTab[lockmode] & lock->waitMask)
668                 status = STATUS_FOUND;
669         else
670                 status = LockCheckConflicts(lockMethodTable, lockmode,
671                                                                         lock, holder,
672                                                                         MyProc, myHolding);
673
674         if (status == STATUS_OK)
675         {
676                 /* No conflict with held or previously requested locks */
677                 GrantLock(lock, holder, lockmode);
678         }
679         else
680         {
681                 Assert(status == STATUS_FOUND);
682                 /*
683                  * We can't acquire the lock immediately.  If caller specified no
684                  * blocking, remove the holder entry and return FALSE without waiting.
685                  */
686                 if (dontWait)
687                 {
688                         if (holder->nHolding == 0)
689                         {
690                                 SHMQueueDelete(&holder->lockLink);
691                                 SHMQueueDelete(&holder->procLink);
692                                 holder = (HOLDER *) hash_search(holderTable,
693                                                                                                 (Pointer) holder,
694                                                                                                 HASH_REMOVE, &found);
695                                 if (!holder || !found)
696                                         elog(NOTICE, "LockAcquire: remove holder, table corrupted");
697                         }
698                         else
699                                 HOLDER_PRINT("LockAcquire: NHOLDING", holder);
700                         lock->nRequested--;
701                         lock->requested[lockmode]--;
702                         LOCK_PRINT("LockAcquire: conditional lock failed", lock, lockmode);
703                         Assert((lock->nRequested > 0) && (lock->requested[lockmode] >= 0));
704                         Assert(lock->nGranted <= lock->nRequested);
705                         SpinRelease(masterLock);
706                         return FALSE;
707                 }
708
709                 /*
710                  * Construct bitmask of locks this process holds on this object.
711                  */
712                 {
713                         int                     heldLocks = 0;
714                         int                     tmpMask;
715
716                         for (i = 1, tmpMask = 2;
717                                  i <= lockMethodTable->ctl->numLockModes;
718                                  i++, tmpMask <<= 1)
719                         {
720                                 if (myHolding[i] > 0)
721                                         heldLocks |= tmpMask;
722                         }
723                         MyProc->heldLocks = heldLocks;
724                 }
725
726                 /*
727                  * Sleep till someone wakes me up.
728                  */
729                 status = WaitOnLock(lockmethod, lockmode, lock, holder);
730
731                 /*
732                  * NOTE: do not do any material change of state between here and
733                  * return.      All required changes in locktable state must have been
734                  * done when the lock was granted to us --- see notes in
735                  * WaitOnLock.
736                  */
737
738                 /*
739                  * Check the holder entry status, in case something in the ipc
740                  * communication doesn't work correctly.
741                  */
742                 if (!((holder->nHolding > 0) && (holder->holding[lockmode] > 0)))
743                 {
744                         HOLDER_PRINT("LockAcquire: INCONSISTENT", holder);
745                         LOCK_PRINT("LockAcquire: INCONSISTENT", lock, lockmode);
746                         /* Should we retry ? */
747                         SpinRelease(masterLock);
748                         return FALSE;
749                 }
750                 HOLDER_PRINT("LockAcquire: granted", holder);
751                 LOCK_PRINT("LockAcquire: granted", lock, lockmode);
752         }
753
754         SpinRelease(masterLock);
755
756         return status == STATUS_OK;
757 }
758
759 /*
760  * LockCheckConflicts -- test whether requested lock conflicts
761  *              with those already granted
762  *
763  * Returns STATUS_FOUND if conflict, STATUS_OK if no conflict.
764  *
765  * NOTES:
766  *              Here's what makes this complicated: one process's locks don't
767  * conflict with one another, even if they are held under different
768  * transaction IDs (eg, session and xact locks do not conflict).
769  * So, we must subtract off our own locks when determining whether the
770  * requested new lock conflicts with those already held.
771  *
772  * The caller can optionally pass the process's total holding counts, if
773  * known.  If NULL is passed then these values will be computed internally.
774  */
775 int
776 LockCheckConflicts(LOCKMETHODTABLE *lockMethodTable,
777                                    LOCKMODE lockmode,
778                                    LOCK *lock,
779                                    HOLDER *holder,
780                                    PROC *proc,
781                                    int *myHolding)              /* myHolding[] array or NULL */
782 {
783         LOCKMETHODCTL *lockctl = lockMethodTable->ctl;
784         int                     numLockModes = lockctl->numLockModes;
785         int                     bitmask;
786         int                     i,
787                                 tmpMask;
788         int                     localHolding[MAX_LOCKMODES];
789
790         /*
791          * first check for global conflicts: If no locks conflict with my
792          * request, then I get the lock.
793          *
794          * Checking for conflict: lock->grantMask represents the types of
795          * currently held locks.  conflictTable[lockmode] has a bit set for
796          * each type of lock that conflicts with request.       Bitwise compare
797          * tells if there is a conflict.
798          */
799         if (!(lockctl->conflictTab[lockmode] & lock->grantMask))
800         {
801                 HOLDER_PRINT("LockCheckConflicts: no conflict", holder);
802                 return STATUS_OK;
803         }
804
805         /*
806          * Rats.  Something conflicts. But it could still be my own lock.  We
807          * have to construct a conflict mask that does not reflect our own
808          * locks.  Locks held by the current process under another XID also
809          * count as "our own locks".
810          */
811         if (myHolding == NULL)
812         {
813                 /* Caller didn't do calculation of total holding for me */
814                 LockCountMyLocks(holder->tag.lock, proc, localHolding);
815                 myHolding = localHolding;
816         }
817
818         /* Compute mask of lock types held by other processes */
819         bitmask = 0;
820         tmpMask = 2;
821         for (i = 1; i <= numLockModes; i++, tmpMask <<= 1)
822         {
823                 if (lock->granted[i] != myHolding[i])
824                         bitmask |= tmpMask;
825         }
826
827         /*
828          * now check again for conflicts.  'bitmask' describes the types of
829          * locks held by other processes.  If one of these conflicts with the
830          * kind of lock that I want, there is a conflict and I have to sleep.
831          */
832         if (!(lockctl->conflictTab[lockmode] & bitmask))
833         {
834                 /* no conflict. OK to get the lock */
835                 HOLDER_PRINT("LockCheckConflicts: resolved", holder);
836                 return STATUS_OK;
837         }
838
839         HOLDER_PRINT("LockCheckConflicts: conflicting", holder);
840         return STATUS_FOUND;
841 }
842
843 /*
844  * LockCountMyLocks --- Count total number of locks held on a given lockable
845  *              object by a given process (under any transaction ID).
846  *
847  * XXX This could be rather slow if the process holds a large number of locks.
848  * Perhaps it could be sped up if we kept yet a third hashtable of per-
849  * process lock information.  However, for the normal case where a transaction
850  * doesn't hold a large number of locks, keeping such a table would probably
851  * be a net slowdown.
852  */
853 static void
854 LockCountMyLocks(SHMEM_OFFSET lockOffset, PROC *proc, int *myHolding)
855 {
856         SHM_QUEUE  *procHolders = &(proc->procHolders);
857         HOLDER     *holder;
858         int                     i;
859
860         MemSet(myHolding, 0, MAX_LOCKMODES * sizeof(int));
861
862         holder = (HOLDER *) SHMQueueNext(procHolders, procHolders,
863                                                                          offsetof(HOLDER, procLink));
864
865         while (holder)
866         {
867                 if (lockOffset == holder->tag.lock)
868                 {
869                         for (i = 1; i < MAX_LOCKMODES; i++)
870                                 myHolding[i] += holder->holding[i];
871                 }
872
873                 holder = (HOLDER *) SHMQueueNext(procHolders, &holder->procLink,
874                                                                                  offsetof(HOLDER, procLink));
875         }
876 }
877
878 /*
879  * GrantLock -- update the lock and holder data structures to show
880  *              the lock request has been granted.
881  *
882  * NOTE: if proc was blocked, it also needs to be removed from the wait list
883  * and have its waitLock/waitHolder fields cleared.  That's not done here.
884  */
885 void
886 GrantLock(LOCK *lock, HOLDER *holder, LOCKMODE lockmode)
887 {
888         lock->nGranted++;
889         lock->granted[lockmode]++;
890         lock->grantMask |= BITS_ON[lockmode];
891         if (lock->granted[lockmode] == lock->requested[lockmode])
892                 lock->waitMask &= BITS_OFF[lockmode];
893         LOCK_PRINT("GrantLock", lock, lockmode);
894         Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
895         Assert(lock->nGranted <= lock->nRequested);
896         holder->holding[lockmode]++;
897         holder->nHolding++;
898         Assert((holder->nHolding > 0) && (holder->holding[lockmode] > 0));
899 }
900
901 /*
902  * WaitOnLock -- wait to acquire a lock
903  *
904  * Caller must have set MyProc->heldLocks to reflect locks already held
905  * on the lockable object by this process (under all XIDs).
906  *
907  * The locktable spinlock must be held at entry.
908  */
909 static int
910 WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode,
911                    LOCK *lock, HOLDER *holder)
912 {
913         LOCKMETHODTABLE *lockMethodTable = LockMethodTable[lockmethod];
914         char       *new_status,
915                            *old_status;
916
917         Assert(lockmethod < NumLockMethods);
918
919         LOCK_PRINT("WaitOnLock: sleeping on lock", lock, lockmode);
920
921         old_status = pstrdup(get_ps_display());
922         new_status = (char *) palloc(strlen(old_status) + 10);
923         strcpy(new_status, old_status);
924         strcat(new_status, " waiting");
925         set_ps_display(new_status);
926
927         /*
928          * NOTE: Think not to put any shared-state cleanup after the call to
929          * ProcSleep, in either the normal or failure path.  The lock state
930          * must be fully set by the lock grantor, or by HandleDeadLock if we
931          * give up waiting for the lock.  This is necessary because of the
932          * possibility that a cancel/die interrupt will interrupt ProcSleep
933          * after someone else grants us the lock, but before we've noticed it.
934          * Hence, after granting, the locktable state must fully reflect the
935          * fact that we own the lock; we can't do additional work on return.
936          * Contrariwise, if we fail, any cleanup must happen in xact abort
937          * processing, not here, to ensure it will also happen in the
938          * cancel/die case.
939          */
940
941         if (ProcSleep(lockMethodTable,
942                                   lockmode,
943                                   lock,
944                                   holder) != STATUS_OK)
945         {
946
947                 /*
948                  * We failed as a result of a deadlock, see HandleDeadLock(). Quit
949                  * now.  Removal of the holder and lock objects, if no longer
950                  * needed, will happen in xact cleanup (see above for motivation).
951                  */
952                 LOCK_PRINT("WaitOnLock: aborting on lock", lock, lockmode);
953                 SpinRelease(lockMethodTable->ctl->masterLock);
954                 elog(ERROR, "deadlock detected");
955                 /* not reached */
956         }
957
958         set_ps_display(old_status);
959         pfree(old_status);
960         pfree(new_status);
961
962         LOCK_PRINT("WaitOnLock: wakeup on lock", lock, lockmode);
963         return STATUS_OK;
964 }
965
966 /*
967  * Remove a proc from the wait-queue it is on
968  * (caller must know it is on one).
969  *
970  * Locktable lock must be held by caller.
971  *
972  * NB: this does not remove the process' holder object, nor the lock object,
973  * even though their counts might now have gone to zero.  That will happen
974  * during a subsequent LockReleaseAll call, which we expect will happen
975  * during transaction cleanup.  (Removal of a proc from its wait queue by
976  * this routine can only happen if we are aborting the transaction.)
977  */
978 void
979 RemoveFromWaitQueue(PROC *proc)
980 {
981         LOCK       *waitLock = proc->waitLock;
982         LOCKMODE        lockmode = proc->waitLockMode;
983
984         /* Make sure proc is waiting */
985         Assert(proc->links.next != INVALID_OFFSET);
986         Assert(waitLock);
987         Assert(waitLock->waitProcs.size > 0);
988
989         /* Remove proc from lock's wait queue */
990         SHMQueueDelete(&(proc->links));
991         waitLock->waitProcs.size--;
992
993         /* Undo increments of request counts by waiting process */
994         Assert(waitLock->nRequested > 0);
995         Assert(waitLock->nRequested > proc->waitLock->nGranted);
996         waitLock->nRequested--;
997         Assert(waitLock->requested[lockmode] > 0);
998         waitLock->requested[lockmode]--;
999         /* don't forget to clear waitMask bit if appropriate */
1000         if (waitLock->granted[lockmode] == waitLock->requested[lockmode])
1001                 waitLock->waitMask &= BITS_OFF[lockmode];
1002
1003         /* Clean up the proc's own state */
1004         proc->waitLock = NULL;
1005         proc->waitHolder = NULL;
1006
1007         /* See if any other waiters for the lock can be woken up now */
1008         ProcLockWakeup(GetLocksMethodTable(waitLock), waitLock);
1009 }
1010
1011 /*
1012  * LockRelease -- look up 'locktag' in lock table 'lockmethod' and
1013  *              release one 'lockmode' lock on it.
1014  *
1015  * Side Effects: find any waiting processes that are now wakable,
1016  *              grant them their requested locks and awaken them.
1017  *              (We have to grant the lock here to avoid a race between
1018  *              the waking process and any new process to
1019  *              come along and request the lock.)
1020  */
1021 bool
1022 LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag,
1023                         TransactionId xid, LOCKMODE lockmode)
1024 {
1025         LOCK       *lock;
1026         SPINLOCK        masterLock;
1027         bool            found;
1028         LOCKMETHODTABLE *lockMethodTable;
1029         HOLDER     *holder;
1030         HOLDERTAG       holdertag;
1031         HTAB       *holderTable;
1032         bool            wakeupNeeded = false;
1033
1034 #ifdef LOCK_DEBUG
1035         if (lockmethod == USER_LOCKMETHOD && Trace_userlocks)
1036                 elog(DEBUG, "LockRelease: user lock tag [%u] %d", locktag->objId.blkno, lockmode);
1037 #endif
1038
1039         /* ???????? This must be changed when short term locks will be used */
1040         locktag->lockmethod = lockmethod;
1041
1042         Assert(lockmethod < NumLockMethods);
1043         lockMethodTable = LockMethodTable[lockmethod];
1044         if (!lockMethodTable)
1045         {
1046                 elog(NOTICE, "lockMethodTable is null in LockRelease");
1047                 return FALSE;
1048         }
1049
1050         if (LockingIsDisabled)
1051                 return TRUE;
1052
1053         masterLock = lockMethodTable->ctl->masterLock;
1054         SpinAcquire(masterLock);
1055
1056         /*
1057          * Find a lock with this tag
1058          */
1059         Assert(lockMethodTable->lockHash->hash == tag_hash);
1060         lock = (LOCK *) hash_search(lockMethodTable->lockHash, (Pointer) locktag,
1061                                                                 HASH_FIND, &found);
1062
1063         /*
1064          * let the caller print its own error message, too. Do not
1065          * elog(ERROR).
1066          */
1067         if (!lock)
1068         {
1069                 SpinRelease(masterLock);
1070                 elog(NOTICE, "LockRelease: locktable corrupted");
1071                 return FALSE;
1072         }
1073
1074         if (!found)
1075         {
1076                 SpinRelease(masterLock);
1077                 elog(NOTICE, "LockRelease: no such lock");
1078                 return FALSE;
1079         }
1080         LOCK_PRINT("LockRelease: found", lock, lockmode);
1081
1082         /*
1083          * Find the holder entry for this holder.
1084          */
1085         MemSet(&holdertag, 0, sizeof(HOLDERTAG));       /* must clear padding,
1086                                                                                                  * needed */
1087         holdertag.lock = MAKE_OFFSET(lock);
1088         holdertag.proc = MAKE_OFFSET(MyProc);
1089         TransactionIdStore(xid, &holdertag.xid);
1090
1091         holderTable = lockMethodTable->holderHash;
1092         holder = (HOLDER *) hash_search(holderTable, (Pointer) &holdertag,
1093                                                                         HASH_FIND_SAVE, &found);
1094         if (!holder || !found)
1095         {
1096                 SpinRelease(masterLock);
1097 #ifdef USER_LOCKS
1098                 if (!found && lockmethod == USER_LOCKMETHOD)
1099                         elog(NOTICE, "LockRelease: no lock with this tag");
1100                 else
1101 #endif
1102                         elog(NOTICE, "LockRelease: holder table corrupted");
1103                 return FALSE;
1104         }
1105         HOLDER_PRINT("LockRelease: found", holder);
1106
1107         /*
1108          * Check that we are actually holding a lock of the type we want to
1109          * release.
1110          */
1111         if (!(holder->holding[lockmode] > 0))
1112         {
1113                 HOLDER_PRINT("LockRelease: WRONGTYPE", holder);
1114                 Assert(holder->holding[lockmode] >= 0);
1115                 SpinRelease(masterLock);
1116                 elog(NOTICE, "LockRelease: you don't own a lock of type %s",
1117                          lock_mode_names[lockmode]);
1118                 return FALSE;
1119         }
1120         Assert(holder->nHolding > 0);
1121         Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
1122         Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
1123         Assert(lock->nGranted <= lock->nRequested);
1124
1125         /*
1126          * fix the general lock stats
1127          */
1128         lock->nRequested--;
1129         lock->requested[lockmode]--;
1130         lock->nGranted--;
1131         lock->granted[lockmode]--;
1132
1133         if (lock->granted[lockmode] == 0)
1134         {
1135                 /* change the conflict mask.  No more of this lock type. */
1136                 lock->grantMask &= BITS_OFF[lockmode];
1137         }
1138
1139         LOCK_PRINT("LockRelease: updated", lock, lockmode);
1140         Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
1141         Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
1142         Assert(lock->nGranted <= lock->nRequested);
1143
1144         /*
1145          * We need only run ProcLockWakeup if the released lock conflicts with
1146          * at least one of the lock types requested by waiter(s).  Otherwise
1147          * whatever conflict made them wait must still exist.  NOTE: before
1148          * MVCC, we could skip wakeup if lock->granted[lockmode] was still
1149          * positive. But that's not true anymore, because the remaining
1150          * granted locks might belong to some waiter, who could now be
1151          * awakened because he doesn't conflict with his own locks.
1152          */
1153         if (lockMethodTable->ctl->conflictTab[lockmode] & lock->waitMask)
1154                 wakeupNeeded = true;
1155
1156         if (lock->nRequested == 0)
1157         {
1158
1159                 /*
1160                  * if there's no one waiting in the queue, we just released the
1161                  * last lock on this object. Delete it from the lock table.
1162                  */
1163                 Assert(lockMethodTable->lockHash->hash == tag_hash);
1164                 lock = (LOCK *) hash_search(lockMethodTable->lockHash,
1165                                                                         (Pointer) &(lock->tag),
1166                                                                         HASH_REMOVE,
1167                                                                         &found);
1168                 if (!lock || !found)
1169                 {
1170                         SpinRelease(masterLock);
1171                         elog(NOTICE, "LockRelease: remove lock, table corrupted");
1172                         return FALSE;
1173                 }
1174                 wakeupNeeded = false;   /* should be false, but make sure */
1175         }
1176
1177         /*
1178          * Now fix the per-holder lock stats.
1179          */
1180         holder->holding[lockmode]--;
1181         holder->nHolding--;
1182         HOLDER_PRINT("LockRelease: updated", holder);
1183         Assert((holder->nHolding >= 0) && (holder->holding[lockmode] >= 0));
1184
1185         /*
1186          * If this was my last hold on this lock, delete my entry in the
1187          * holder table.
1188          */
1189         if (holder->nHolding == 0)
1190         {
1191                 HOLDER_PRINT("LockRelease: deleting", holder);
1192                 SHMQueueDelete(&holder->lockLink);
1193                 SHMQueueDelete(&holder->procLink);
1194                 holder = (HOLDER *) hash_search(holderTable, (Pointer) &holder,
1195                                                                                 HASH_REMOVE_SAVED, &found);
1196                 if (!holder || !found)
1197                 {
1198                         SpinRelease(masterLock);
1199                         elog(NOTICE, "LockRelease: remove holder, table corrupted");
1200                         return FALSE;
1201                 }
1202         }
1203
1204         /*
1205          * Wake up waiters if needed.
1206          */
1207         if (wakeupNeeded)
1208                 ProcLockWakeup(lockMethodTable, lock);
1209
1210         SpinRelease(masterLock);
1211         return TRUE;
1212 }
1213
1214 /*
1215  * LockReleaseAll -- Release all locks in a process's lock list.
1216  *
1217  * Well, not really *all* locks.
1218  *
1219  * If 'allxids' is TRUE, all locks of the specified lock method are
1220  * released, regardless of transaction affiliation.
1221  *
1222  * If 'allxids' is FALSE, all locks of the specified lock method and
1223  * specified XID are released.
1224  */
1225 bool
1226 LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc,
1227                            bool allxids, TransactionId xid)
1228 {
1229         SHM_QUEUE  *procHolders = &(proc->procHolders);
1230         HOLDER     *holder;
1231         HOLDER     *nextHolder;
1232         SPINLOCK        masterLock;
1233         LOCKMETHODTABLE *lockMethodTable;
1234         int                     i,
1235                                 numLockModes;
1236         LOCK       *lock;
1237         bool            found;
1238
1239 #ifdef LOCK_DEBUG
1240         if (lockmethod == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
1241                 elog(DEBUG, "LockReleaseAll: lockmethod=%d, pid=%d",
1242                          lockmethod, proc->pid);
1243 #endif
1244
1245         Assert(lockmethod < NumLockMethods);
1246         lockMethodTable = LockMethodTable[lockmethod];
1247         if (!lockMethodTable)
1248         {
1249                 elog(NOTICE, "LockReleaseAll: bad lockmethod %d", lockmethod);
1250                 return FALSE;
1251         }
1252
1253         numLockModes = lockMethodTable->ctl->numLockModes;
1254         masterLock = lockMethodTable->ctl->masterLock;
1255
1256         SpinAcquire(masterLock);
1257
1258         holder = (HOLDER *) SHMQueueNext(procHolders, procHolders,
1259                                                                          offsetof(HOLDER, procLink));
1260
1261         while (holder)
1262         {
1263                 bool            wakeupNeeded = false;
1264
1265                 /* Get link first, since we may unlink/delete this holder */
1266                 nextHolder = (HOLDER *) SHMQueueNext(procHolders, &holder->procLink,
1267                                                                                          offsetof(HOLDER, procLink));
1268
1269                 Assert(holder->tag.proc == MAKE_OFFSET(proc));
1270
1271                 lock = (LOCK *) MAKE_PTR(holder->tag.lock);
1272
1273                 /* Ignore items that are not of the lockmethod to be removed */
1274                 if (LOCK_LOCKMETHOD(*lock) != lockmethod)
1275                         goto next_item;
1276
1277                 /* If not allxids, ignore items that are of the wrong xid */
1278                 if (!allxids && !TransactionIdEquals(xid, holder->tag.xid))
1279                         goto next_item;
1280
1281                 HOLDER_PRINT("LockReleaseAll", holder);
1282                 LOCK_PRINT("LockReleaseAll", lock, 0);
1283                 Assert(lock->nRequested >= 0);
1284                 Assert(lock->nGranted >= 0);
1285                 Assert(lock->nGranted <= lock->nRequested);
1286                 Assert(holder->nHolding >= 0);
1287                 Assert(holder->nHolding <= lock->nRequested);
1288
1289                 /*
1290                  * fix the general lock stats
1291                  */
1292                 if (lock->nRequested != holder->nHolding)
1293                 {
1294                         for (i = 1; i <= numLockModes; i++)
1295                         {
1296                                 Assert(holder->holding[i] >= 0);
1297                                 if (holder->holding[i] > 0)
1298                                 {
1299                                         lock->requested[i] -= holder->holding[i];
1300                                         lock->granted[i] -= holder->holding[i];
1301                                         Assert(lock->requested[i] >= 0 && lock->granted[i] >= 0);
1302                                         if (lock->granted[i] == 0)
1303                                                 lock->grantMask &= BITS_OFF[i];
1304
1305                                         /*
1306                                          * Read comments in LockRelease
1307                                          */
1308                                         if (!wakeupNeeded &&
1309                                         lockMethodTable->ctl->conflictTab[i] & lock->waitMask)
1310                                                 wakeupNeeded = true;
1311                                 }
1312                         }
1313                         lock->nRequested -= holder->nHolding;
1314                         lock->nGranted -= holder->nHolding;
1315                         Assert((lock->nRequested >= 0) && (lock->nGranted >= 0));
1316                         Assert(lock->nGranted <= lock->nRequested);
1317                 }
1318                 else
1319                 {
1320                         /*
1321                          * This holder accounts for all the requested locks on the
1322                          * object, so we can be lazy and just zero things out.
1323                          */
1324                         lock->nRequested = 0;
1325                         lock->nGranted = 0;
1326                         /* Fix the lock status, just for next LOCK_PRINT message. */
1327                         for (i = 1; i <= numLockModes; i++)
1328                         {
1329                                 Assert(lock->requested[i] == lock->granted[i]);
1330                                 lock->requested[i] = lock->granted[i] = 0;
1331                         }
1332                 }
1333                 LOCK_PRINT("LockReleaseAll: updated", lock, 0);
1334
1335                 HOLDER_PRINT("LockReleaseAll: deleting", holder);
1336
1337                 /*
1338                  * Remove the holder entry from the linked lists
1339                  */
1340                 SHMQueueDelete(&holder->lockLink);
1341                 SHMQueueDelete(&holder->procLink);
1342
1343                 /*
1344                  * remove the holder entry from the hashtable
1345                  */
1346                 holder = (HOLDER *) hash_search(lockMethodTable->holderHash,
1347                                                                                 (Pointer) holder,
1348                                                                                 HASH_REMOVE,
1349                                                                                 &found);
1350                 if (!holder || !found)
1351                 {
1352                         SpinRelease(masterLock);
1353                         elog(NOTICE, "LockReleaseAll: holder table corrupted");
1354                         return FALSE;
1355                 }
1356
1357                 if (lock->nRequested == 0)
1358                 {
1359
1360                         /*
1361                          * We've just released the last lock, so garbage-collect the
1362                          * lock object.
1363                          */
1364                         LOCK_PRINT("LockReleaseAll: deleting", lock, 0);
1365                         Assert(lockMethodTable->lockHash->hash == tag_hash);
1366                         lock = (LOCK *) hash_search(lockMethodTable->lockHash,
1367                                                                                 (Pointer) &(lock->tag),
1368                                                                                 HASH_REMOVE, &found);
1369                         if (!lock || !found)
1370                         {
1371                                 SpinRelease(masterLock);
1372                                 elog(NOTICE, "LockReleaseAll: cannot remove lock from HTAB");
1373                                 return FALSE;
1374                         }
1375                 }
1376                 else if (wakeupNeeded)
1377                         ProcLockWakeup(lockMethodTable, lock);
1378
1379 next_item:
1380                 holder = nextHolder;
1381         }
1382
1383         SpinRelease(masterLock);
1384
1385 #ifdef LOCK_DEBUG
1386         if (lockmethod == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
1387                 elog(DEBUG, "LockReleaseAll: done");
1388 #endif
1389
1390         return TRUE;
1391 }
1392
1393 int
1394 LockShmemSize(int maxBackends)
1395 {
1396         int                     size = 0;
1397         long            max_table_size = NLOCKENTS(maxBackends);
1398
1399         size += MAXALIGN(sizeof(PROC_HDR)); /* ProcGlobal */
1400         size += maxBackends * MAXALIGN(sizeof(PROC));           /* each MyProc */
1401         size += MAX_LOCK_METHODS * MAXALIGN(sizeof(LOCKMETHODCTL)); /* each
1402                                                                                                                                  * lockMethodTable->ctl */
1403
1404         /* lockHash table */
1405         size += hash_estimate_size(max_table_size,
1406                                                            SHMEM_LOCKTAB_KEYSIZE,
1407                                                            SHMEM_LOCKTAB_DATASIZE);
1408
1409         /* holderHash table */
1410         size += hash_estimate_size(max_table_size,
1411                                                            SHMEM_HOLDERTAB_KEYSIZE,
1412                                                            SHMEM_HOLDERTAB_DATASIZE);
1413
1414         /*
1415          * Since the lockHash entry count above is only an estimate, add 10%
1416          * safety margin.
1417          */
1418         size += size / 10;
1419
1420         return size;
1421 }
1422
1423
1424 #ifdef LOCK_DEBUG
1425 /*
1426  * Dump all locks in the proc->procHolders list.
1427  *
1428  * Must have already acquired the masterLock.
1429  */
1430 void
1431 DumpLocks(void)
1432 {
1433         SHMEM_OFFSET location;
1434         PROC       *proc;
1435         SHM_QUEUE  *procHolders;
1436         HOLDER     *holder;
1437         LOCK       *lock;
1438         int                     lockmethod = DEFAULT_LOCKMETHOD;
1439         LOCKMETHODTABLE *lockMethodTable;
1440
1441         ShmemPIDLookup(MyProcPid, &location);
1442         if (location == INVALID_OFFSET)
1443                 return;
1444         proc = (PROC *) MAKE_PTR(location);
1445         if (proc != MyProc)
1446                 return;
1447         procHolders = &proc->procHolders;
1448
1449         Assert(lockmethod < NumLockMethods);
1450         lockMethodTable = LockMethodTable[lockmethod];
1451         if (!lockMethodTable)
1452                 return;
1453
1454         if (proc->waitLock)
1455                 LOCK_PRINT("DumpLocks: waiting on", proc->waitLock, 0);
1456
1457         holder = (HOLDER *) SHMQueueNext(procHolders, procHolders,
1458                                                                          offsetof(HOLDER, procLink));
1459
1460         while (holder)
1461         {
1462                 Assert(holder->tag.proc == MAKE_OFFSET(proc));
1463
1464                 lock = (LOCK *) MAKE_PTR(holder->tag.lock);
1465
1466                 HOLDER_PRINT("DumpLocks", holder);
1467                 LOCK_PRINT("DumpLocks", lock, 0);
1468
1469                 holder = (HOLDER *) SHMQueueNext(procHolders, &holder->procLink,
1470                                                                                  offsetof(HOLDER, procLink));
1471         }
1472 }
1473
1474 /*
1475  * Dump all postgres locks. Must have already acquired the masterLock.
1476  */
1477 void
1478 DumpAllLocks(void)
1479 {
1480         SHMEM_OFFSET location;
1481         PROC       *proc;
1482         HOLDER     *holder = NULL;
1483         LOCK       *lock;
1484         int                     pid;
1485         int                     lockmethod = DEFAULT_LOCKMETHOD;
1486         LOCKMETHODTABLE *lockMethodTable;
1487         HTAB       *holderTable;
1488         HASH_SEQ_STATUS status;
1489
1490         pid = getpid();
1491         ShmemPIDLookup(pid, &location);
1492         if (location == INVALID_OFFSET)
1493                 return;
1494         proc = (PROC *) MAKE_PTR(location);
1495         if (proc != MyProc)
1496                 return;
1497
1498         Assert(lockmethod < NumLockMethods);
1499         lockMethodTable = LockMethodTable[lockmethod];
1500         if (!lockMethodTable)
1501                 return;
1502
1503         holderTable = lockMethodTable->holderHash;
1504
1505         if (proc->waitLock)
1506                 LOCK_PRINT("DumpAllLocks: waiting on", proc->waitLock, 0);
1507
1508         hash_seq_init(&status, holderTable);
1509         while ((holder = (HOLDER *) hash_seq_search(&status)) &&
1510                    (holder != (HOLDER *) TRUE))
1511         {
1512                 HOLDER_PRINT("DumpAllLocks", holder);
1513
1514                 if (holder->tag.lock)
1515                 {
1516                         lock = (LOCK *) MAKE_PTR(holder->tag.lock);
1517                         LOCK_PRINT("DumpAllLocks", lock, 0);
1518                 }
1519                 else
1520                         elog(DEBUG, "DumpAllLocks: holder->tag.lock = NULL");
1521         }
1522 }
1523
1524 #endif   /* LOCK_DEBUG */