]> granicus.if.org Git - postgresql/blob - src/backend/storage/lmgr/lock.c
pgindent run on all C files. Java run to follow. initdb/regression
[postgresql] / src / backend / storage / lmgr / lock.c
1 /*-------------------------------------------------------------------------
2  *
3  * lock.c
4  *        POSTGRES low-level lock mechanism
5  *
6  * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.101 2001/10/25 05:49:42 momjian Exp $
12  *
13  * NOTES
14  *        Outside modules can create a lock table and acquire/release
15  *        locks.  A lock table is a shared memory hash table.  When
16  *        a process tries to acquire a lock of a type that conflicts
17  *        with existing locks, it is put to sleep using the routines
18  *        in storage/lmgr/proc.c.
19  *
20  *        For the most part, this code should be invoked via lmgr.c
21  *        or another lock-management module, not directly.
22  *
23  *      Interface:
24  *
25  *      LockAcquire(), LockRelease(), LockMethodTableInit(),
26  *      LockMethodTableRename(), LockReleaseAll,
27  *      LockCheckConflicts(), GrantLock()
28  *
29  *-------------------------------------------------------------------------
30  */
31 #include "postgres.h"
32
33 #include <sys/types.h>
34 #include <unistd.h>
35 #include <signal.h>
36
37 #include "access/xact.h"
38 #include "miscadmin.h"
39 #include "storage/proc.h"
40 #include "utils/memutils.h"
41 #include "utils/ps_status.h"
42
43
44 /* This configuration variable is used to set the lock table size */
45 int                     max_locks_per_xact; /* set by guc.c */
46
47 #define NLOCKENTS(maxBackends)  (max_locks_per_xact * (maxBackends))
48
49
50 static int WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode,
51                    LOCK *lock, HOLDER *holder);
52 static void LockCountMyLocks(SHMEM_OFFSET lockOffset, PROC *proc,
53                                  int *myHolding);
54
55 static char *lock_mode_names[] =
56 {
57                                 "INVALID",
58                                 "AccessShareLock",
59                                 "RowShareLock",
60                                 "RowExclusiveLock",
61                                 "ShareUpdateExclusiveLock",
62                                 "ShareLock",
63                                 "ShareRowExclusiveLock",
64                                 "ExclusiveLock",
65                                 "AccessExclusiveLock"
66 };
67
68
69 #ifdef LOCK_DEBUG
70
71 /*------
72  * The following configuration options are available for lock debugging:
73  *
74  *         TRACE_LOCKS          -- give a bunch of output what's going on in this file
75  *         TRACE_USERLOCKS      -- same but for user locks
76  *         TRACE_LOCK_OIDMIN-- do not trace locks for tables below this oid
77  *                                                 (use to avoid output on system tables)
78  *         TRACE_LOCK_TABLE -- trace locks on this table (oid) unconditionally
79  *         DEBUG_DEADLOCKS      -- currently dumps locks at untimely occasions ;)
80  *
81  * Furthermore, but in storage/lmgr/lwlock.c:
82  *         TRACE_LWLOCKS        -- trace lightweight locks (pretty useless)
83  *
84  * Define LOCK_DEBUG at compile time to get all these enabled.
85  * --------
86  */
87
88 int                     Trace_lock_oidmin = BootstrapObjectIdData;
89 bool            Trace_locks = false;
90 bool            Trace_userlocks = false;
91 int                     Trace_lock_table = 0;
92 bool            Debug_deadlocks = false;
93
94
95 inline static bool
96 LOCK_DEBUG_ENABLED(const LOCK *lock)
97 {
98         return
99                 (((LOCK_LOCKMETHOD(*lock) == DEFAULT_LOCKMETHOD && Trace_locks)
100            || (LOCK_LOCKMETHOD(*lock) == USER_LOCKMETHOD && Trace_userlocks))
101                  && (lock->tag.relId >= (Oid) Trace_lock_oidmin))
102                 || (Trace_lock_table && (lock->tag.relId == Trace_lock_table));
103 }
104
105
106 inline static void
107 LOCK_PRINT(const char *where, const LOCK *lock, LOCKMODE type)
108 {
109         if (LOCK_DEBUG_ENABLED(lock))
110                 elog(DEBUG,
111                          "%s: lock(%lx) tbl(%d) rel(%u) db(%u) obj(%u) grantMask(%x) "
112                          "req(%d,%d,%d,%d,%d,%d,%d)=%d "
113                          "grant(%d,%d,%d,%d,%d,%d,%d)=%d wait(%d) type(%s)",
114                          where, MAKE_OFFSET(lock),
115                          lock->tag.lockmethod, lock->tag.relId, lock->tag.dbId,
116                          lock->tag.objId.blkno, lock->grantMask,
117                          lock->requested[1], lock->requested[2], lock->requested[3],
118                          lock->requested[4], lock->requested[5], lock->requested[6],
119                          lock->requested[7], lock->nRequested,
120                          lock->granted[1], lock->granted[2], lock->granted[3],
121                          lock->granted[4], lock->granted[5], lock->granted[6],
122                          lock->granted[7], lock->nGranted,
123                          lock->waitProcs.size, lock_mode_names[type]);
124 }
125
126
127 inline static void
128 HOLDER_PRINT(const char *where, const HOLDER *holderP)
129 {
130         if (
131          (((HOLDER_LOCKMETHOD(*holderP) == DEFAULT_LOCKMETHOD && Trace_locks)
132            || (HOLDER_LOCKMETHOD(*holderP) == USER_LOCKMETHOD && Trace_userlocks))
133           && (((LOCK *) MAKE_PTR(holderP->tag.lock))->tag.relId >= (Oid) Trace_lock_oidmin))
134                 || (Trace_lock_table && (((LOCK *) MAKE_PTR(holderP->tag.lock))->tag.relId == Trace_lock_table))
135                 )
136                 elog(DEBUG,
137                          "%s: holder(%lx) lock(%lx) tbl(%d) proc(%lx) xid(%u) hold(%d,%d,%d,%d,%d,%d,%d)=%d",
138                          where, MAKE_OFFSET(holderP), holderP->tag.lock,
139                          HOLDER_LOCKMETHOD(*(holderP)),
140                          holderP->tag.proc, holderP->tag.xid,
141                    holderP->holding[1], holderP->holding[2], holderP->holding[3],
142                    holderP->holding[4], holderP->holding[5], holderP->holding[6],
143                          holderP->holding[7], holderP->nHolding);
144 }
145
146 #else                                                   /* not LOCK_DEBUG */
147
148 #define LOCK_PRINT(where, lock, type)
149 #define HOLDER_PRINT(where, holderP)
150 #endif   /* not LOCK_DEBUG */
151
152
153 /*
154  * These are to simplify/speed up some bit arithmetic.
155  *
156  * XXX is a fetch from a static array really faster than a shift?
157  * Wouldn't bet on it...
158  */
159
160 static LOCKMASK BITS_OFF[MAX_LOCKMODES];
161 static LOCKMASK BITS_ON[MAX_LOCKMODES];
162
163 /*
164  * map from lockmethod to the lock table structure
165  */
166 static LOCKMETHODTABLE *LockMethodTable[MAX_LOCK_METHODS];
167
168 static int      NumLockMethods;
169
170 /*
171  * InitLocks -- Init the lock module.  Create a private data
172  *              structure for constructing conflict masks.
173  */
174 void
175 InitLocks(void)
176 {
177         int                     i;
178         int                     bit;
179
180         bit = 1;
181         for (i = 0; i < MAX_LOCKMODES; i++, bit <<= 1)
182         {
183                 BITS_ON[i] = bit;
184                 BITS_OFF[i] = ~bit;
185         }
186 }
187
188
189 /*
190  * Fetch the lock method table associated with a given lock
191  */
192 LOCKMETHODTABLE *
193 GetLocksMethodTable(LOCK *lock)
194 {
195         LOCKMETHOD      lockmethod = LOCK_LOCKMETHOD(*lock);
196
197         Assert(lockmethod > 0 && lockmethod < NumLockMethods);
198         return LockMethodTable[lockmethod];
199 }
200
201
202 /*
203  * LockMethodInit -- initialize the lock table's lock type
204  *              structures
205  *
206  * Notes: just copying.  Should only be called once.
207  */
208 static void
209 LockMethodInit(LOCKMETHODTABLE *lockMethodTable,
210                            LOCKMASK *conflictsP,
211                            int *prioP,
212                            int numModes)
213 {
214         int                     i;
215
216         lockMethodTable->ctl->numLockModes = numModes;
217         numModes++;
218         for (i = 0; i < numModes; i++, prioP++, conflictsP++)
219         {
220                 lockMethodTable->ctl->conflictTab[i] = *conflictsP;
221                 lockMethodTable->ctl->prio[i] = *prioP;
222         }
223 }
224
225 /*
226  * LockMethodTableInit -- initialize a lock table structure
227  *
228  * NOTE: data structures allocated here are allocated permanently, using
229  * TopMemoryContext and shared memory.  We don't ever release them anyway,
230  * and in normal multi-backend operation the lock table structures set up
231  * by the postmaster are inherited by each backend, so they must be in
232  * TopMemoryContext.
233  */
234 LOCKMETHOD
235 LockMethodTableInit(char *tabName,
236                                         LOCKMASK *conflictsP,
237                                         int *prioP,
238                                         int numModes,
239                                         int maxBackends)
240 {
241         LOCKMETHODTABLE *lockMethodTable;
242         char       *shmemName;
243         HASHCTL         info;
244         int                     hash_flags;
245         bool            found;
246         long            init_table_size,
247                                 max_table_size;
248
249         if (numModes >= MAX_LOCKMODES)
250         {
251                 elog(NOTICE, "LockMethodTableInit: too many lock types %d greater than %d",
252                          numModes, MAX_LOCKMODES);
253                 return INVALID_LOCKMETHOD;
254         }
255
256         /* Compute init/max size to request for lock hashtables */
257         max_table_size = NLOCKENTS(maxBackends);
258         init_table_size = max_table_size / 10;
259
260         /* Allocate a string for the shmem index table lookups. */
261         /* This is just temp space in this routine, so palloc is OK. */
262         shmemName = (char *) palloc(strlen(tabName) + 32);
263
264         /* each lock table has a non-shared, permanent header */
265         lockMethodTable = (LOCKMETHODTABLE *)
266                 MemoryContextAlloc(TopMemoryContext, sizeof(LOCKMETHODTABLE));
267
268         /*
269          * Lock the LWLock for the table (probably not necessary here)
270          */
271         LWLockAcquire(LockMgrLock, LW_EXCLUSIVE);
272
273         /*
274          * allocate a control structure from shared memory or attach to it if
275          * it already exists.
276          */
277         sprintf(shmemName, "%s (ctl)", tabName);
278         lockMethodTable->ctl = (LOCKMETHODCTL *)
279                 ShmemInitStruct(shmemName, sizeof(LOCKMETHODCTL), &found);
280
281         if (!lockMethodTable->ctl)
282                 elog(FATAL, "LockMethodTableInit: couldn't initialize %s", tabName);
283
284         /*
285          * no zero-th table
286          */
287         NumLockMethods = 1;
288
289         /*
290          * we're first - initialize
291          */
292         if (!found)
293         {
294                 MemSet(lockMethodTable->ctl, 0, sizeof(LOCKMETHODCTL));
295                 lockMethodTable->ctl->masterLock = LockMgrLock;
296                 lockMethodTable->ctl->lockmethod = NumLockMethods;
297         }
298
299         /*
300          * other modules refer to the lock table by a lockmethod ID
301          */
302         LockMethodTable[NumLockMethods] = lockMethodTable;
303         NumLockMethods++;
304         Assert(NumLockMethods <= MAX_LOCK_METHODS);
305
306         /*
307          * allocate a hash table for LOCK structs.      This is used to store
308          * per-locked-object information.
309          */
310         info.keysize = sizeof(LOCKTAG);
311         info.entrysize = sizeof(LOCK);
312         info.hash = tag_hash;
313         hash_flags = (HASH_ELEM | HASH_FUNCTION);
314
315         sprintf(shmemName, "%s (lock hash)", tabName);
316         lockMethodTable->lockHash = ShmemInitHash(shmemName,
317                                                                                           init_table_size,
318                                                                                           max_table_size,
319                                                                                           &info,
320                                                                                           hash_flags);
321
322         if (!lockMethodTable->lockHash)
323                 elog(FATAL, "LockMethodTableInit: couldn't initialize %s", tabName);
324         Assert(lockMethodTable->lockHash->hash == tag_hash);
325
326         /*
327          * allocate a hash table for HOLDER structs.  This is used to store
328          * per-lock-holder information.
329          */
330         info.keysize = sizeof(HOLDERTAG);
331         info.entrysize = sizeof(HOLDER);
332         info.hash = tag_hash;
333         hash_flags = (HASH_ELEM | HASH_FUNCTION);
334
335         sprintf(shmemName, "%s (holder hash)", tabName);
336         lockMethodTable->holderHash = ShmemInitHash(shmemName,
337                                                                                                 init_table_size,
338                                                                                                 max_table_size,
339                                                                                                 &info,
340                                                                                                 hash_flags);
341
342         if (!lockMethodTable->holderHash)
343                 elog(FATAL, "LockMethodTableInit: couldn't initialize %s", tabName);
344
345         /* init ctl data structures */
346         LockMethodInit(lockMethodTable, conflictsP, prioP, numModes);
347
348         LWLockRelease(LockMgrLock);
349
350         pfree(shmemName);
351
352         return lockMethodTable->ctl->lockmethod;
353 }
354
355 /*
356  * LockMethodTableRename -- allocate another lockmethod ID to the same
357  *              lock table.
358  *
359  * NOTES: Both the lock module and the lock chain (lchain.c)
360  *              module use table id's to distinguish between different
361  *              kinds of locks.  Short term and long term locks look
362  *              the same to the lock table, but are handled differently
363  *              by the lock chain manager.      This function allows the
364  *              client to use different lockmethods when acquiring/releasing
365  *              short term and long term locks, yet store them all in one hashtable.
366  */
367
368 LOCKMETHOD
369 LockMethodTableRename(LOCKMETHOD lockmethod)
370 {
371         LOCKMETHOD      newLockMethod;
372
373         if (NumLockMethods >= MAX_LOCK_METHODS)
374                 return INVALID_LOCKMETHOD;
375         if (LockMethodTable[lockmethod] == INVALID_LOCKMETHOD)
376                 return INVALID_LOCKMETHOD;
377
378         /* other modules refer to the lock table by a lockmethod ID */
379         newLockMethod = NumLockMethods;
380         NumLockMethods++;
381
382         LockMethodTable[newLockMethod] = LockMethodTable[lockmethod];
383         return newLockMethod;
384 }
385
386 /*
387  * LockAcquire -- Check for lock conflicts, sleep if conflict found,
388  *              set lock if/when no conflicts.
389  *
390  * Returns: TRUE if lock was acquired, FALSE otherwise.  Note that
391  *              a FALSE return is to be expected if dontWait is TRUE;
392  *              but if dontWait is FALSE, only a parameter error can cause
393  *              a FALSE return.  (XXX probably we should just elog on parameter
394  *              errors, instead of conflating this with failure to acquire lock?)
395  *
396  * Side Effects: The lock is acquired and recorded in lock tables.
397  *
398  * NOTE: if we wait for the lock, there is no way to abort the wait
399  * short of aborting the transaction.
400  *
401  *
402  * Note on User Locks:
403  *
404  *              User locks are handled totally on the application side as
405  *              long term cooperative locks which extend beyond the normal
406  *              transaction boundaries.  Their purpose is to indicate to an
407  *              application that someone is `working' on an item.  So it is
408  *              possible to put an user lock on a tuple's oid, retrieve the
409  *              tuple, work on it for an hour and then update it and remove
410  *              the lock.  While the lock is active other clients can still
411  *              read and write the tuple but they can be aware that it has
412  *              been locked at the application level by someone.
413  *              User locks use lock tags made of an uint16 and an uint32, for
414  *              example 0 and a tuple oid, or any other arbitrary pair of
415  *              numbers following a convention established by the application.
416  *              In this sense tags don't refer to tuples or database entities.
417  *              User locks and normal locks are completely orthogonal and
418  *              they don't interfere with each other, so it is possible
419  *              to acquire a normal lock on an user-locked tuple or user-lock
420  *              a tuple for which a normal write lock already exists.
421  *              User locks are always non blocking, therefore they are never
422  *              acquired if already held by another process.  They must be
423  *              released explicitly by the application but they are released
424  *              automatically when a backend terminates.
425  *              They are indicated by a lockmethod 2 which is an alias for the
426  *              normal lock table, and are distinguished from normal locks
427  *              by the following differences:
428  *
429  *                                                                              normal lock             user lock
430  *
431  *              lockmethod                                              1                               2
432  *              tag.dbId                                                database oid    database oid
433  *              tag.relId                                               rel oid or 0    0
434  *              tag.objId                                               block id                lock id2
435  *                                                                              or xact id
436  *              tag.offnum                                              0                               lock id1
437  *              holder.xid                                              xid or 0                0
438  *              persistence                                             transaction             user or backend
439  *                                                                              or backend
440  *
441  *              The lockmode parameter can have the same values for normal locks
442  *              although probably only WRITE_LOCK can have some practical use.
443  *
444  *                                                                                                              DZ - 22 Nov 1997
445  */
446
447 bool
448 LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
449                         TransactionId xid, LOCKMODE lockmode, bool dontWait)
450 {
451         HOLDER     *holder;
452         HOLDERTAG       holdertag;
453         HTAB       *holderTable;
454         bool            found;
455         LOCK       *lock;
456         LWLockId        masterLock;
457         LOCKMETHODTABLE *lockMethodTable;
458         int                     status;
459         int                     myHolding[MAX_LOCKMODES];
460         int                     i;
461
462 #ifdef LOCK_DEBUG
463         if (lockmethod == USER_LOCKMETHOD && Trace_userlocks)
464                 elog(DEBUG, "LockAcquire: user lock [%u] %s",
465                          locktag->objId.blkno, lock_mode_names[lockmode]);
466 #endif
467
468         /* ???????? This must be changed when short term locks will be used */
469         locktag->lockmethod = lockmethod;
470
471         Assert(lockmethod < NumLockMethods);
472         lockMethodTable = LockMethodTable[lockmethod];
473         if (!lockMethodTable)
474         {
475                 elog(NOTICE, "LockAcquire: bad lock table %d", lockmethod);
476                 return FALSE;
477         }
478
479         masterLock = lockMethodTable->ctl->masterLock;
480
481         LWLockAcquire(masterLock, LW_EXCLUSIVE);
482
483         /*
484          * Find or create a lock with this tag
485          */
486         Assert(lockMethodTable->lockHash->hash == tag_hash);
487         lock = (LOCK *) hash_search(lockMethodTable->lockHash,
488                                                                 (void *) locktag,
489                                                                 HASH_ENTER, &found);
490         if (!lock)
491         {
492                 LWLockRelease(masterLock);
493                 elog(ERROR, "LockAcquire: lock table %d is out of memory",
494                          lockmethod);
495                 return FALSE;
496         }
497
498         /*
499          * if it's a new lock object, initialize it
500          */
501         if (!found)
502         {
503                 lock->grantMask = 0;
504                 lock->waitMask = 0;
505                 SHMQueueInit(&(lock->lockHolders));
506                 ProcQueueInit(&(lock->waitProcs));
507                 lock->nRequested = 0;
508                 lock->nGranted = 0;
509                 MemSet((char *) lock->requested, 0, sizeof(int) * MAX_LOCKMODES);
510                 MemSet((char *) lock->granted, 0, sizeof(int) * MAX_LOCKMODES);
511                 LOCK_PRINT("LockAcquire: new", lock, lockmode);
512         }
513         else
514         {
515                 LOCK_PRINT("LockAcquire: found", lock, lockmode);
516                 Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
517                 Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
518                 Assert(lock->nGranted <= lock->nRequested);
519         }
520
521         /*
522          * Create the hash key for the holder table.
523          */
524         MemSet(&holdertag, 0, sizeof(HOLDERTAG));       /* must clear padding,
525                                                                                                  * needed */
526         holdertag.lock = MAKE_OFFSET(lock);
527         holdertag.proc = MAKE_OFFSET(MyProc);
528         TransactionIdStore(xid, &holdertag.xid);
529
530         /*
531          * Find or create a holder entry with this tag
532          */
533         holderTable = lockMethodTable->holderHash;
534         holder = (HOLDER *) hash_search(holderTable,
535                                                                         (void *) &holdertag,
536                                                                         HASH_ENTER, &found);
537         if (!holder)
538         {
539                 LWLockRelease(masterLock);
540                 elog(ERROR, "LockAcquire: holder table out of memory");
541                 return FALSE;
542         }
543
544         /*
545          * If new, initialize the new entry
546          */
547         if (!found)
548         {
549                 holder->nHolding = 0;
550                 MemSet((char *) holder->holding, 0, sizeof(int) * MAX_LOCKMODES);
551                 /* Add holder to appropriate lists */
552                 SHMQueueInsertBefore(&lock->lockHolders, &holder->lockLink);
553                 SHMQueueInsertBefore(&MyProc->procHolders, &holder->procLink);
554                 HOLDER_PRINT("LockAcquire: new", holder);
555         }
556         else
557         {
558                 HOLDER_PRINT("LockAcquire: found", holder);
559                 Assert((holder->nHolding >= 0) && (holder->holding[lockmode] >= 0));
560                 Assert(holder->nHolding <= lock->nGranted);
561
562 #ifdef CHECK_DEADLOCK_RISK
563
564                 /*
565                  * Issue warning if we already hold a lower-level lock on this
566                  * object and do not hold a lock of the requested level or higher.
567                  * This indicates a deadlock-prone coding practice (eg, we'd have
568                  * a deadlock if another backend were following the same code path
569                  * at about the same time).
570                  *
571                  * This is not enabled by default, because it may generate log
572                  * entries about user-level coding practices that are in fact safe
573                  * in context. It can be enabled to help find system-level
574                  * problems.
575                  *
576                  * XXX Doing numeric comparison on the lockmodes is a hack; it'd be
577                  * better to use a table.  For now, though, this works.
578                  */
579                 for (i = lockMethodTable->ctl->numLockModes; i > 0; i--)
580                 {
581                         if (holder->holding[i] > 0)
582                         {
583                                 if (i >= (int) lockmode)
584                                         break;          /* safe: we have a lock >= req level */
585                                 elog(DEBUG, "Deadlock risk: raising lock level"
586                                          " from %s to %s on object %u/%u/%u",
587                                          lock_mode_names[i], lock_mode_names[lockmode],
588                                  lock->tag.relId, lock->tag.dbId, lock->tag.objId.blkno);
589                                 break;
590                         }
591                 }
592 #endif   /* CHECK_DEADLOCK_RISK */
593         }
594
595         /*
596          * lock->nRequested and lock->requested[] count the total number of
597          * requests, whether granted or waiting, so increment those
598          * immediately. The other counts don't increment till we get the lock.
599          */
600         lock->nRequested++;
601         lock->requested[lockmode]++;
602         Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
603
604         /*
605          * If I already hold one or more locks of the requested type, just
606          * grant myself another one without blocking.
607          */
608         if (holder->holding[lockmode] > 0)
609         {
610                 GrantLock(lock, holder, lockmode);
611                 HOLDER_PRINT("LockAcquire: owning", holder);
612                 LWLockRelease(masterLock);
613                 return TRUE;
614         }
615
616         /*
617          * If this process (under any XID) is a holder of the lock, also grant
618          * myself another one without blocking.
619          */
620         LockCountMyLocks(holder->tag.lock, MyProc, myHolding);
621         if (myHolding[lockmode] > 0)
622         {
623                 GrantLock(lock, holder, lockmode);
624                 HOLDER_PRINT("LockAcquire: my other XID owning", holder);
625                 LWLockRelease(masterLock);
626                 return TRUE;
627         }
628
629         /*
630          * If lock requested conflicts with locks requested by waiters, must
631          * join wait queue.  Otherwise, check for conflict with already-held
632          * locks.  (That's last because most complex check.)
633          */
634         if (lockMethodTable->ctl->conflictTab[lockmode] & lock->waitMask)
635                 status = STATUS_FOUND;
636         else
637                 status = LockCheckConflicts(lockMethodTable, lockmode,
638                                                                         lock, holder,
639                                                                         MyProc, myHolding);
640
641         if (status == STATUS_OK)
642         {
643                 /* No conflict with held or previously requested locks */
644                 GrantLock(lock, holder, lockmode);
645         }
646         else
647         {
648                 Assert(status == STATUS_FOUND);
649
650                 /*
651                  * We can't acquire the lock immediately.  If caller specified no
652                  * blocking, remove the holder entry and return FALSE without
653                  * waiting.
654                  */
655                 if (dontWait)
656                 {
657                         if (holder->nHolding == 0)
658                         {
659                                 SHMQueueDelete(&holder->lockLink);
660                                 SHMQueueDelete(&holder->procLink);
661                                 holder = (HOLDER *) hash_search(holderTable,
662                                                                                                 (void *) holder,
663                                                                                                 HASH_REMOVE, NULL);
664                                 if (!holder)
665                                         elog(NOTICE, "LockAcquire: remove holder, table corrupted");
666                         }
667                         else
668                                 HOLDER_PRINT("LockAcquire: NHOLDING", holder);
669                         lock->nRequested--;
670                         lock->requested[lockmode]--;
671                         LOCK_PRINT("LockAcquire: conditional lock failed", lock, lockmode);
672                         Assert((lock->nRequested > 0) && (lock->requested[lockmode] >= 0));
673                         Assert(lock->nGranted <= lock->nRequested);
674                         LWLockRelease(masterLock);
675                         return FALSE;
676                 }
677
678                 /*
679                  * Construct bitmask of locks this process holds on this object.
680                  */
681                 {
682                         int                     heldLocks = 0;
683                         int                     tmpMask;
684
685                         for (i = 1, tmpMask = 2;
686                                  i <= lockMethodTable->ctl->numLockModes;
687                                  i++, tmpMask <<= 1)
688                         {
689                                 if (myHolding[i] > 0)
690                                         heldLocks |= tmpMask;
691                         }
692                         MyProc->heldLocks = heldLocks;
693                 }
694
695                 /*
696                  * Sleep till someone wakes me up.
697                  */
698                 status = WaitOnLock(lockmethod, lockmode, lock, holder);
699
700                 /*
701                  * NOTE: do not do any material change of state between here and
702                  * return.      All required changes in locktable state must have been
703                  * done when the lock was granted to us --- see notes in
704                  * WaitOnLock.
705                  */
706
707                 /*
708                  * Check the holder entry status, in case something in the ipc
709                  * communication doesn't work correctly.
710                  */
711                 if (!((holder->nHolding > 0) && (holder->holding[lockmode] > 0)))
712                 {
713                         HOLDER_PRINT("LockAcquire: INCONSISTENT", holder);
714                         LOCK_PRINT("LockAcquire: INCONSISTENT", lock, lockmode);
715                         /* Should we retry ? */
716                         LWLockRelease(masterLock);
717                         return FALSE;
718                 }
719                 HOLDER_PRINT("LockAcquire: granted", holder);
720                 LOCK_PRINT("LockAcquire: granted", lock, lockmode);
721         }
722
723         LWLockRelease(masterLock);
724
725         return status == STATUS_OK;
726 }
727
728 /*
729  * LockCheckConflicts -- test whether requested lock conflicts
730  *              with those already granted
731  *
732  * Returns STATUS_FOUND if conflict, STATUS_OK if no conflict.
733  *
734  * NOTES:
735  *              Here's what makes this complicated: one process's locks don't
736  * conflict with one another, even if they are held under different
737  * transaction IDs (eg, session and xact locks do not conflict).
738  * So, we must subtract off our own locks when determining whether the
739  * requested new lock conflicts with those already held.
740  *
741  * The caller can optionally pass the process's total holding counts, if
742  * known.  If NULL is passed then these values will be computed internally.
743  */
744 int
745 LockCheckConflicts(LOCKMETHODTABLE *lockMethodTable,
746                                    LOCKMODE lockmode,
747                                    LOCK *lock,
748                                    HOLDER *holder,
749                                    PROC *proc,
750                                    int *myHolding)              /* myHolding[] array or NULL */
751 {
752         LOCKMETHODCTL *lockctl = lockMethodTable->ctl;
753         int                     numLockModes = lockctl->numLockModes;
754         int                     bitmask;
755         int                     i,
756                                 tmpMask;
757         int                     localHolding[MAX_LOCKMODES];
758
759         /*
760          * first check for global conflicts: If no locks conflict with my
761          * request, then I get the lock.
762          *
763          * Checking for conflict: lock->grantMask represents the types of
764          * currently held locks.  conflictTable[lockmode] has a bit set for
765          * each type of lock that conflicts with request.       Bitwise compare
766          * tells if there is a conflict.
767          */
768         if (!(lockctl->conflictTab[lockmode] & lock->grantMask))
769         {
770                 HOLDER_PRINT("LockCheckConflicts: no conflict", holder);
771                 return STATUS_OK;
772         }
773
774         /*
775          * Rats.  Something conflicts. But it could still be my own lock.  We
776          * have to construct a conflict mask that does not reflect our own
777          * locks.  Locks held by the current process under another XID also
778          * count as "our own locks".
779          */
780         if (myHolding == NULL)
781         {
782                 /* Caller didn't do calculation of total holding for me */
783                 LockCountMyLocks(holder->tag.lock, proc, localHolding);
784                 myHolding = localHolding;
785         }
786
787         /* Compute mask of lock types held by other processes */
788         bitmask = 0;
789         tmpMask = 2;
790         for (i = 1; i <= numLockModes; i++, tmpMask <<= 1)
791         {
792                 if (lock->granted[i] != myHolding[i])
793                         bitmask |= tmpMask;
794         }
795
796         /*
797          * now check again for conflicts.  'bitmask' describes the types of
798          * locks held by other processes.  If one of these conflicts with the
799          * kind of lock that I want, there is a conflict and I have to sleep.
800          */
801         if (!(lockctl->conflictTab[lockmode] & bitmask))
802         {
803                 /* no conflict. OK to get the lock */
804                 HOLDER_PRINT("LockCheckConflicts: resolved", holder);
805                 return STATUS_OK;
806         }
807
808         HOLDER_PRINT("LockCheckConflicts: conflicting", holder);
809         return STATUS_FOUND;
810 }
811
812 /*
813  * LockCountMyLocks --- Count total number of locks held on a given lockable
814  *              object by a given process (under any transaction ID).
815  *
816  * XXX This could be rather slow if the process holds a large number of locks.
817  * Perhaps it could be sped up if we kept yet a third hashtable of per-
818  * process lock information.  However, for the normal case where a transaction
819  * doesn't hold a large number of locks, keeping such a table would probably
820  * be a net slowdown.
821  */
822 static void
823 LockCountMyLocks(SHMEM_OFFSET lockOffset, PROC *proc, int *myHolding)
824 {
825         SHM_QUEUE  *procHolders = &(proc->procHolders);
826         HOLDER     *holder;
827         int                     i;
828
829         MemSet(myHolding, 0, MAX_LOCKMODES * sizeof(int));
830
831         holder = (HOLDER *) SHMQueueNext(procHolders, procHolders,
832                                                                          offsetof(HOLDER, procLink));
833
834         while (holder)
835         {
836                 if (lockOffset == holder->tag.lock)
837                 {
838                         for (i = 1; i < MAX_LOCKMODES; i++)
839                                 myHolding[i] += holder->holding[i];
840                 }
841
842                 holder = (HOLDER *) SHMQueueNext(procHolders, &holder->procLink,
843                                                                                  offsetof(HOLDER, procLink));
844         }
845 }
846
847 /*
848  * GrantLock -- update the lock and holder data structures to show
849  *              the lock request has been granted.
850  *
851  * NOTE: if proc was blocked, it also needs to be removed from the wait list
852  * and have its waitLock/waitHolder fields cleared.  That's not done here.
853  */
854 void
855 GrantLock(LOCK *lock, HOLDER *holder, LOCKMODE lockmode)
856 {
857         lock->nGranted++;
858         lock->granted[lockmode]++;
859         lock->grantMask |= BITS_ON[lockmode];
860         if (lock->granted[lockmode] == lock->requested[lockmode])
861                 lock->waitMask &= BITS_OFF[lockmode];
862         LOCK_PRINT("GrantLock", lock, lockmode);
863         Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
864         Assert(lock->nGranted <= lock->nRequested);
865         holder->holding[lockmode]++;
866         holder->nHolding++;
867         Assert((holder->nHolding > 0) && (holder->holding[lockmode] > 0));
868 }
869
870 /*
871  * WaitOnLock -- wait to acquire a lock
872  *
873  * Caller must have set MyProc->heldLocks to reflect locks already held
874  * on the lockable object by this process (under all XIDs).
875  *
876  * The locktable's masterLock must be held at entry.
877  */
878 static int
879 WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode,
880                    LOCK *lock, HOLDER *holder)
881 {
882         LOCKMETHODTABLE *lockMethodTable = LockMethodTable[lockmethod];
883         char       *new_status,
884                            *old_status;
885
886         Assert(lockmethod < NumLockMethods);
887
888         LOCK_PRINT("WaitOnLock: sleeping on lock", lock, lockmode);
889
890         old_status = pstrdup(get_ps_display());
891         new_status = (char *) palloc(strlen(old_status) + 10);
892         strcpy(new_status, old_status);
893         strcat(new_status, " waiting");
894         set_ps_display(new_status);
895
896         /*
897          * NOTE: Think not to put any shared-state cleanup after the call to
898          * ProcSleep, in either the normal or failure path.  The lock state
899          * must be fully set by the lock grantor, or by HandleDeadLock if we
900          * give up waiting for the lock.  This is necessary because of the
901          * possibility that a cancel/die interrupt will interrupt ProcSleep
902          * after someone else grants us the lock, but before we've noticed it.
903          * Hence, after granting, the locktable state must fully reflect the
904          * fact that we own the lock; we can't do additional work on return.
905          * Contrariwise, if we fail, any cleanup must happen in xact abort
906          * processing, not here, to ensure it will also happen in the
907          * cancel/die case.
908          */
909
910         if (ProcSleep(lockMethodTable,
911                                   lockmode,
912                                   lock,
913                                   holder) != STATUS_OK)
914         {
915                 /*
916                  * We failed as a result of a deadlock, see HandleDeadLock(). Quit
917                  * now.  Removal of the holder and lock objects, if no longer
918                  * needed, will happen in xact cleanup (see above for motivation).
919                  */
920                 LOCK_PRINT("WaitOnLock: aborting on lock", lock, lockmode);
921                 LWLockRelease(lockMethodTable->ctl->masterLock);
922                 elog(ERROR, "deadlock detected");
923                 /* not reached */
924         }
925
926         set_ps_display(old_status);
927         pfree(old_status);
928         pfree(new_status);
929
930         LOCK_PRINT("WaitOnLock: wakeup on lock", lock, lockmode);
931         return STATUS_OK;
932 }
933
934 /*
935  * Remove a proc from the wait-queue it is on
936  * (caller must know it is on one).
937  *
938  * Locktable lock must be held by caller.
939  *
940  * NB: this does not remove the process' holder object, nor the lock object,
941  * even though their counts might now have gone to zero.  That will happen
942  * during a subsequent LockReleaseAll call, which we expect will happen
943  * during transaction cleanup.  (Removal of a proc from its wait queue by
944  * this routine can only happen if we are aborting the transaction.)
945  */
946 void
947 RemoveFromWaitQueue(PROC *proc)
948 {
949         LOCK       *waitLock = proc->waitLock;
950         LOCKMODE        lockmode = proc->waitLockMode;
951
952         /* Make sure proc is waiting */
953         Assert(proc->links.next != INVALID_OFFSET);
954         Assert(waitLock);
955         Assert(waitLock->waitProcs.size > 0);
956
957         /* Remove proc from lock's wait queue */
958         SHMQueueDelete(&(proc->links));
959         waitLock->waitProcs.size--;
960
961         /* Undo increments of request counts by waiting process */
962         Assert(waitLock->nRequested > 0);
963         Assert(waitLock->nRequested > proc->waitLock->nGranted);
964         waitLock->nRequested--;
965         Assert(waitLock->requested[lockmode] > 0);
966         waitLock->requested[lockmode]--;
967         /* don't forget to clear waitMask bit if appropriate */
968         if (waitLock->granted[lockmode] == waitLock->requested[lockmode])
969                 waitLock->waitMask &= BITS_OFF[lockmode];
970
971         /* Clean up the proc's own state */
972         proc->waitLock = NULL;
973         proc->waitHolder = NULL;
974
975         /* See if any other waiters for the lock can be woken up now */
976         ProcLockWakeup(GetLocksMethodTable(waitLock), waitLock);
977 }
978
979 /*
980  * LockRelease -- look up 'locktag' in lock table 'lockmethod' and
981  *              release one 'lockmode' lock on it.
982  *
983  * Side Effects: find any waiting processes that are now wakable,
984  *              grant them their requested locks and awaken them.
985  *              (We have to grant the lock here to avoid a race between
986  *              the waking process and any new process to
987  *              come along and request the lock.)
988  */
989 bool
990 LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag,
991                         TransactionId xid, LOCKMODE lockmode)
992 {
993         LOCK       *lock;
994         LWLockId        masterLock;
995         LOCKMETHODTABLE *lockMethodTable;
996         HOLDER     *holder;
997         HOLDERTAG       holdertag;
998         HTAB       *holderTable;
999         bool            wakeupNeeded = false;
1000
1001 #ifdef LOCK_DEBUG
1002         if (lockmethod == USER_LOCKMETHOD && Trace_userlocks)
1003                 elog(DEBUG, "LockRelease: user lock tag [%u] %d", locktag->objId.blkno, lockmode);
1004 #endif
1005
1006         /* ???????? This must be changed when short term locks will be used */
1007         locktag->lockmethod = lockmethod;
1008
1009         Assert(lockmethod < NumLockMethods);
1010         lockMethodTable = LockMethodTable[lockmethod];
1011         if (!lockMethodTable)
1012         {
1013                 elog(NOTICE, "lockMethodTable is null in LockRelease");
1014                 return FALSE;
1015         }
1016
1017         masterLock = lockMethodTable->ctl->masterLock;
1018         LWLockAcquire(masterLock, LW_EXCLUSIVE);
1019
1020         /*
1021          * Find a lock with this tag
1022          */
1023         Assert(lockMethodTable->lockHash->hash == tag_hash);
1024         lock = (LOCK *) hash_search(lockMethodTable->lockHash,
1025                                                                 (void *) locktag,
1026                                                                 HASH_FIND, NULL);
1027
1028         /*
1029          * let the caller print its own error message, too. Do not
1030          * elog(ERROR).
1031          */
1032         if (!lock)
1033         {
1034                 LWLockRelease(masterLock);
1035                 elog(NOTICE, "LockRelease: no such lock");
1036                 return FALSE;
1037         }
1038         LOCK_PRINT("LockRelease: found", lock, lockmode);
1039
1040         /*
1041          * Find the holder entry for this holder.
1042          */
1043         MemSet(&holdertag, 0, sizeof(HOLDERTAG));       /* must clear padding,
1044                                                                                                  * needed */
1045         holdertag.lock = MAKE_OFFSET(lock);
1046         holdertag.proc = MAKE_OFFSET(MyProc);
1047         TransactionIdStore(xid, &holdertag.xid);
1048
1049         holderTable = lockMethodTable->holderHash;
1050         holder = (HOLDER *) hash_search(holderTable,
1051                                                                         (void *) &holdertag,
1052                                                                         HASH_FIND_SAVE, NULL);
1053         if (!holder)
1054         {
1055                 LWLockRelease(masterLock);
1056 #ifdef USER_LOCKS
1057                 if (lockmethod == USER_LOCKMETHOD)
1058                         elog(NOTICE, "LockRelease: no lock with this tag");
1059                 else
1060 #endif
1061                         elog(NOTICE, "LockRelease: holder table corrupted");
1062                 return FALSE;
1063         }
1064         HOLDER_PRINT("LockRelease: found", holder);
1065
1066         /*
1067          * Check that we are actually holding a lock of the type we want to
1068          * release.
1069          */
1070         if (!(holder->holding[lockmode] > 0))
1071         {
1072                 HOLDER_PRINT("LockRelease: WRONGTYPE", holder);
1073                 Assert(holder->holding[lockmode] >= 0);
1074                 LWLockRelease(masterLock);
1075                 elog(NOTICE, "LockRelease: you don't own a lock of type %s",
1076                          lock_mode_names[lockmode]);
1077                 return FALSE;
1078         }
1079         Assert(holder->nHolding > 0);
1080         Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
1081         Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
1082         Assert(lock->nGranted <= lock->nRequested);
1083
1084         /*
1085          * fix the general lock stats
1086          */
1087         lock->nRequested--;
1088         lock->requested[lockmode]--;
1089         lock->nGranted--;
1090         lock->granted[lockmode]--;
1091
1092         if (lock->granted[lockmode] == 0)
1093         {
1094                 /* change the conflict mask.  No more of this lock type. */
1095                 lock->grantMask &= BITS_OFF[lockmode];
1096         }
1097
1098         LOCK_PRINT("LockRelease: updated", lock, lockmode);
1099         Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
1100         Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
1101         Assert(lock->nGranted <= lock->nRequested);
1102
1103         /*
1104          * We need only run ProcLockWakeup if the released lock conflicts with
1105          * at least one of the lock types requested by waiter(s).  Otherwise
1106          * whatever conflict made them wait must still exist.  NOTE: before
1107          * MVCC, we could skip wakeup if lock->granted[lockmode] was still
1108          * positive. But that's not true anymore, because the remaining
1109          * granted locks might belong to some waiter, who could now be
1110          * awakened because he doesn't conflict with his own locks.
1111          */
1112         if (lockMethodTable->ctl->conflictTab[lockmode] & lock->waitMask)
1113                 wakeupNeeded = true;
1114
1115         if (lock->nRequested == 0)
1116         {
1117                 /*
1118                  * if there's no one waiting in the queue, we just released the
1119                  * last lock on this object. Delete it from the lock table.
1120                  */
1121                 Assert(lockMethodTable->lockHash->hash == tag_hash);
1122                 lock = (LOCK *) hash_search(lockMethodTable->lockHash,
1123                                                                         (void *) &(lock->tag),
1124                                                                         HASH_REMOVE,
1125                                                                         NULL);
1126                 if (!lock)
1127                 {
1128                         LWLockRelease(masterLock);
1129                         elog(NOTICE, "LockRelease: remove lock, table corrupted");
1130                         return FALSE;
1131                 }
1132                 wakeupNeeded = false;   /* should be false, but make sure */
1133         }
1134
1135         /*
1136          * Now fix the per-holder lock stats.
1137          */
1138         holder->holding[lockmode]--;
1139         holder->nHolding--;
1140         HOLDER_PRINT("LockRelease: updated", holder);
1141         Assert((holder->nHolding >= 0) && (holder->holding[lockmode] >= 0));
1142
1143         /*
1144          * If this was my last hold on this lock, delete my entry in the
1145          * holder table.
1146          */
1147         if (holder->nHolding == 0)
1148         {
1149                 HOLDER_PRINT("LockRelease: deleting", holder);
1150                 SHMQueueDelete(&holder->lockLink);
1151                 SHMQueueDelete(&holder->procLink);
1152                 holder = (HOLDER *) hash_search(holderTable,
1153                                                                                 (void *) &holder,
1154                                                                                 HASH_REMOVE_SAVED, NULL);
1155                 if (!holder)
1156                 {
1157                         LWLockRelease(masterLock);
1158                         elog(NOTICE, "LockRelease: remove holder, table corrupted");
1159                         return FALSE;
1160                 }
1161         }
1162
1163         /*
1164          * Wake up waiters if needed.
1165          */
1166         if (wakeupNeeded)
1167                 ProcLockWakeup(lockMethodTable, lock);
1168
1169         LWLockRelease(masterLock);
1170         return TRUE;
1171 }
1172
1173 /*
1174  * LockReleaseAll -- Release all locks in a process's lock list.
1175  *
1176  * Well, not really *all* locks.
1177  *
1178  * If 'allxids' is TRUE, all locks of the specified lock method are
1179  * released, regardless of transaction affiliation.
1180  *
1181  * If 'allxids' is FALSE, all locks of the specified lock method and
1182  * specified XID are released.
1183  */
1184 bool
1185 LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc,
1186                            bool allxids, TransactionId xid)
1187 {
1188         SHM_QUEUE  *procHolders = &(proc->procHolders);
1189         HOLDER     *holder;
1190         HOLDER     *nextHolder;
1191         LWLockId        masterLock;
1192         LOCKMETHODTABLE *lockMethodTable;
1193         int                     i,
1194                                 numLockModes;
1195         LOCK       *lock;
1196
1197 #ifdef LOCK_DEBUG
1198         if (lockmethod == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
1199                 elog(DEBUG, "LockReleaseAll: lockmethod=%d, pid=%d",
1200                          lockmethod, proc->pid);
1201 #endif
1202
1203         Assert(lockmethod < NumLockMethods);
1204         lockMethodTable = LockMethodTable[lockmethod];
1205         if (!lockMethodTable)
1206         {
1207                 elog(NOTICE, "LockReleaseAll: bad lockmethod %d", lockmethod);
1208                 return FALSE;
1209         }
1210
1211         numLockModes = lockMethodTable->ctl->numLockModes;
1212         masterLock = lockMethodTable->ctl->masterLock;
1213
1214         LWLockAcquire(masterLock, LW_EXCLUSIVE);
1215
1216         holder = (HOLDER *) SHMQueueNext(procHolders, procHolders,
1217                                                                          offsetof(HOLDER, procLink));
1218
1219         while (holder)
1220         {
1221                 bool            wakeupNeeded = false;
1222
1223                 /* Get link first, since we may unlink/delete this holder */
1224                 nextHolder = (HOLDER *) SHMQueueNext(procHolders, &holder->procLink,
1225                                                                                          offsetof(HOLDER, procLink));
1226
1227                 Assert(holder->tag.proc == MAKE_OFFSET(proc));
1228
1229                 lock = (LOCK *) MAKE_PTR(holder->tag.lock);
1230
1231                 /* Ignore items that are not of the lockmethod to be removed */
1232                 if (LOCK_LOCKMETHOD(*lock) != lockmethod)
1233                         goto next_item;
1234
1235                 /* If not allxids, ignore items that are of the wrong xid */
1236                 if (!allxids && !TransactionIdEquals(xid, holder->tag.xid))
1237                         goto next_item;
1238
1239                 HOLDER_PRINT("LockReleaseAll", holder);
1240                 LOCK_PRINT("LockReleaseAll", lock, 0);
1241                 Assert(lock->nRequested >= 0);
1242                 Assert(lock->nGranted >= 0);
1243                 Assert(lock->nGranted <= lock->nRequested);
1244                 Assert(holder->nHolding >= 0);
1245                 Assert(holder->nHolding <= lock->nRequested);
1246
1247                 /*
1248                  * fix the general lock stats
1249                  */
1250                 if (lock->nRequested != holder->nHolding)
1251                 {
1252                         for (i = 1; i <= numLockModes; i++)
1253                         {
1254                                 Assert(holder->holding[i] >= 0);
1255                                 if (holder->holding[i] > 0)
1256                                 {
1257                                         lock->requested[i] -= holder->holding[i];
1258                                         lock->granted[i] -= holder->holding[i];
1259                                         Assert(lock->requested[i] >= 0 && lock->granted[i] >= 0);
1260                                         if (lock->granted[i] == 0)
1261                                                 lock->grantMask &= BITS_OFF[i];
1262
1263                                         /*
1264                                          * Read comments in LockRelease
1265                                          */
1266                                         if (!wakeupNeeded &&
1267                                         lockMethodTable->ctl->conflictTab[i] & lock->waitMask)
1268                                                 wakeupNeeded = true;
1269                                 }
1270                         }
1271                         lock->nRequested -= holder->nHolding;
1272                         lock->nGranted -= holder->nHolding;
1273                         Assert((lock->nRequested >= 0) && (lock->nGranted >= 0));
1274                         Assert(lock->nGranted <= lock->nRequested);
1275                 }
1276                 else
1277                 {
1278                         /*
1279                          * This holder accounts for all the requested locks on the
1280                          * object, so we can be lazy and just zero things out.
1281                          */
1282                         lock->nRequested = 0;
1283                         lock->nGranted = 0;
1284                         /* Fix the lock status, just for next LOCK_PRINT message. */
1285                         for (i = 1; i <= numLockModes; i++)
1286                         {
1287                                 Assert(lock->requested[i] == lock->granted[i]);
1288                                 lock->requested[i] = lock->granted[i] = 0;
1289                         }
1290                 }
1291                 LOCK_PRINT("LockReleaseAll: updated", lock, 0);
1292
1293                 HOLDER_PRINT("LockReleaseAll: deleting", holder);
1294
1295                 /*
1296                  * Remove the holder entry from the linked lists
1297                  */
1298                 SHMQueueDelete(&holder->lockLink);
1299                 SHMQueueDelete(&holder->procLink);
1300
1301                 /*
1302                  * remove the holder entry from the hashtable
1303                  */
1304                 holder = (HOLDER *) hash_search(lockMethodTable->holderHash,
1305                                                                                 (void *) holder,
1306                                                                                 HASH_REMOVE,
1307                                                                                 NULL);
1308                 if (!holder)
1309                 {
1310                         LWLockRelease(masterLock);
1311                         elog(NOTICE, "LockReleaseAll: holder table corrupted");
1312                         return FALSE;
1313                 }
1314
1315                 if (lock->nRequested == 0)
1316                 {
1317                         /*
1318                          * We've just released the last lock, so garbage-collect the
1319                          * lock object.
1320                          */
1321                         LOCK_PRINT("LockReleaseAll: deleting", lock, 0);
1322                         Assert(lockMethodTable->lockHash->hash == tag_hash);
1323                         lock = (LOCK *) hash_search(lockMethodTable->lockHash,
1324                                                                                 (void *) &(lock->tag),
1325                                                                                 HASH_REMOVE, NULL);
1326                         if (!lock)
1327                         {
1328                                 LWLockRelease(masterLock);
1329                                 elog(NOTICE, "LockReleaseAll: cannot remove lock from HTAB");
1330                                 return FALSE;
1331                         }
1332                 }
1333                 else if (wakeupNeeded)
1334                         ProcLockWakeup(lockMethodTable, lock);
1335
1336 next_item:
1337                 holder = nextHolder;
1338         }
1339
1340         LWLockRelease(masterLock);
1341
1342 #ifdef LOCK_DEBUG
1343         if (lockmethod == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
1344                 elog(DEBUG, "LockReleaseAll: done");
1345 #endif
1346
1347         return TRUE;
1348 }
1349
1350 int
1351 LockShmemSize(int maxBackends)
1352 {
1353         int                     size = 0;
1354         long            max_table_size = NLOCKENTS(maxBackends);
1355
1356         size += MAXALIGN(sizeof(PROC_HDR)); /* ProcGlobal */
1357         size += maxBackends * MAXALIGN(sizeof(PROC));           /* each MyProc */
1358         size += MAX_LOCK_METHODS * MAXALIGN(sizeof(LOCKMETHODCTL)); /* each
1359                                                                                                                                  * lockMethodTable->ctl */
1360
1361         /* lockHash table */
1362         size += hash_estimate_size(max_table_size, sizeof(LOCK));
1363
1364         /* holderHash table */
1365         size += hash_estimate_size(max_table_size, sizeof(HOLDER));
1366
1367         /*
1368          * Since the lockHash entry count above is only an estimate, add 10%
1369          * safety margin.
1370          */
1371         size += size / 10;
1372
1373         return size;
1374 }
1375
1376
1377 #ifdef LOCK_DEBUG
1378 /*
1379  * Dump all locks in the proc->procHolders list.
1380  *
1381  * Must have already acquired the masterLock.
1382  */
1383 void
1384 DumpLocks(void)
1385 {
1386         PROC       *proc;
1387         SHM_QUEUE  *procHolders;
1388         HOLDER     *holder;
1389         LOCK       *lock;
1390         int                     lockmethod = DEFAULT_LOCKMETHOD;
1391         LOCKMETHODTABLE *lockMethodTable;
1392
1393         proc = MyProc;
1394         if (proc == NULL)
1395                 return;
1396
1397         procHolders = &proc->procHolders;
1398
1399         Assert(lockmethod < NumLockMethods);
1400         lockMethodTable = LockMethodTable[lockmethod];
1401         if (!lockMethodTable)
1402                 return;
1403
1404         if (proc->waitLock)
1405                 LOCK_PRINT("DumpLocks: waiting on", proc->waitLock, 0);
1406
1407         holder = (HOLDER *) SHMQueueNext(procHolders, procHolders,
1408                                                                          offsetof(HOLDER, procLink));
1409
1410         while (holder)
1411         {
1412                 Assert(holder->tag.proc == MAKE_OFFSET(proc));
1413
1414                 lock = (LOCK *) MAKE_PTR(holder->tag.lock);
1415
1416                 HOLDER_PRINT("DumpLocks", holder);
1417                 LOCK_PRINT("DumpLocks", lock, 0);
1418
1419                 holder = (HOLDER *) SHMQueueNext(procHolders, &holder->procLink,
1420                                                                                  offsetof(HOLDER, procLink));
1421         }
1422 }
1423
1424 /*
1425  * Dump all postgres locks. Must have already acquired the masterLock.
1426  */
1427 void
1428 DumpAllLocks(void)
1429 {
1430         PROC       *proc;
1431         HOLDER     *holder;
1432         LOCK       *lock;
1433         int                     lockmethod = DEFAULT_LOCKMETHOD;
1434         LOCKMETHODTABLE *lockMethodTable;
1435         HTAB       *holderTable;
1436         HASH_SEQ_STATUS status;
1437
1438         proc = MyProc;
1439         if (proc == NULL)
1440                 return;
1441
1442         Assert(lockmethod < NumLockMethods);
1443         lockMethodTable = LockMethodTable[lockmethod];
1444         if (!lockMethodTable)
1445                 return;
1446
1447         holderTable = lockMethodTable->holderHash;
1448
1449         if (proc->waitLock)
1450                 LOCK_PRINT("DumpAllLocks: waiting on", proc->waitLock, 0);
1451
1452         hash_seq_init(&status, holderTable);
1453         while ((holder = (HOLDER *) hash_seq_search(&status)) != NULL)
1454         {
1455                 HOLDER_PRINT("DumpAllLocks", holder);
1456
1457                 if (holder->tag.lock)
1458                 {
1459                         lock = (LOCK *) MAKE_PTR(holder->tag.lock);
1460                         LOCK_PRINT("DumpAllLocks", lock, 0);
1461                 }
1462                 else
1463                         elog(DEBUG, "DumpAllLocks: holder->tag.lock = NULL");
1464         }
1465 }
1466 #endif   /* LOCK_DEBUG */