]> granicus.if.org Git - postgresql/blob - src/backend/storage/lmgr/lock.c
Rename 'holder' references to 'proclock' for PROCLOCK references, for
[postgresql] / src / backend / storage / lmgr / lock.c
1 /*-------------------------------------------------------------------------
2  *
3  * lock.c
4  *        POSTGRES low-level lock mechanism
5  *
6  * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.120 2003/02/18 02:13:24 momjian Exp $
12  *
13  * NOTES
14  *        Outside modules can create a lock table and acquire/release
15  *        locks.  A lock table is a shared memory hash table.  When
16  *        a process tries to acquire a lock of a type that conflicts
17  *        with existing locks, it is put to sleep using the routines
18  *        in storage/lmgr/proc.c.
19  *
20  *        For the most part, this code should be invoked via lmgr.c
21  *        or another lock-management module, not directly.
22  *
23  *      Interface:
24  *
25  *      LockAcquire(), LockRelease(), LockMethodTableInit(),
26  *      LockMethodTableRename(), LockReleaseAll,
27  *      LockCheckConflicts(), GrantLock()
28  *
29  *-------------------------------------------------------------------------
30  */
31 #include "postgres.h"
32
33 #include <unistd.h>
34 #include <signal.h>
35
36 #include "access/xact.h"
37 #include "miscadmin.h"
38 #include "storage/proc.h"
39 #include "utils/memutils.h"
40 #include "utils/ps_status.h"
41
42
43 /* This configuration variable is used to set the lock table size */
44 int                     max_locks_per_xact; /* set by guc.c */
45
46 #define NLOCKENTS(maxBackends)  (max_locks_per_xact * (maxBackends))
47
48
49 static int WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode,
50                    LOCK *lock, PROCLOCK *proclock);
51 static void LockCountMyLocks(SHMEM_OFFSET lockOffset, PGPROC *proc,
52                                  int *myHolding);
53
54 static char *lock_mode_names[] =
55 {
56         "INVALID",
57         "AccessShareLock",
58         "RowShareLock",
59         "RowExclusiveLock",
60         "ShareUpdateExclusiveLock",
61         "ShareLock",
62         "ShareRowExclusiveLock",
63         "ExclusiveLock",
64         "AccessExclusiveLock"
65 };
66
67
68 #ifdef LOCK_DEBUG
69
70 /*------
71  * The following configuration options are available for lock debugging:
72  *
73  *         TRACE_LOCKS          -- give a bunch of output what's going on in this file
74  *         TRACE_USERLOCKS      -- same but for user locks
75  *         TRACE_LOCK_OIDMIN-- do not trace locks for tables below this oid
76  *                                                 (use to avoid output on system tables)
77  *         TRACE_LOCK_TABLE -- trace locks on this table (oid) unconditionally
78  *         DEBUG_DEADLOCKS      -- currently dumps locks at untimely occasions ;)
79  *
80  * Furthermore, but in storage/lmgr/lwlock.c:
81  *         TRACE_LWLOCKS        -- trace lightweight locks (pretty useless)
82  *
83  * Define LOCK_DEBUG at compile time to get all these enabled.
84  * --------
85  */
86
87 int                     Trace_lock_oidmin = BootstrapObjectIdData;
88 bool            Trace_locks = false;
89 bool            Trace_userlocks = false;
90 int                     Trace_lock_table = 0;
91 bool            Debug_deadlocks = false;
92
93
94 inline static bool
95 LOCK_DEBUG_ENABLED(const LOCK *lock)
96 {
97         return
98                 (((LOCK_LOCKMETHOD(*lock) == DEFAULT_LOCKMETHOD && Trace_locks)
99            || (LOCK_LOCKMETHOD(*lock) == USER_LOCKMETHOD && Trace_userlocks))
100                  && (lock->tag.relId >= (Oid) Trace_lock_oidmin))
101                 || (Trace_lock_table && (lock->tag.relId == Trace_lock_table));
102 }
103
104
105 inline static void
106 LOCK_PRINT(const char *where, const LOCK *lock, LOCKMODE type)
107 {
108         if (LOCK_DEBUG_ENABLED(lock))
109                 elog(LOG,
110                          "%s: lock(%lx) tbl(%d) rel(%u) db(%u) obj(%u) grantMask(%x) "
111                          "req(%d,%d,%d,%d,%d,%d,%d)=%d "
112                          "grant(%d,%d,%d,%d,%d,%d,%d)=%d wait(%d) type(%s)",
113                          where, MAKE_OFFSET(lock),
114                          lock->tag.lockmethod, lock->tag.relId, lock->tag.dbId,
115                          lock->tag.objId.blkno, lock->grantMask,
116                          lock->requested[1], lock->requested[2], lock->requested[3],
117                          lock->requested[4], lock->requested[5], lock->requested[6],
118                          lock->requested[7], lock->nRequested,
119                          lock->granted[1], lock->granted[2], lock->granted[3],
120                          lock->granted[4], lock->granted[5], lock->granted[6],
121                          lock->granted[7], lock->nGranted,
122                          lock->waitProcs.size, lock_mode_names[type]);
123 }
124
125
126 inline static void
127 PROCLOCK_PRINT(const char *where, const PROCLOCK *proclockP)
128 {
129         if (
130         (((PROCLOCK_LOCKMETHOD(*proclockP) == DEFAULT_LOCKMETHOD && Trace_locks)
131           || (PROCLOCK_LOCKMETHOD(*proclockP) == USER_LOCKMETHOD && Trace_userlocks))
132          && (((LOCK *) MAKE_PTR(proclockP->tag.lock))->tag.relId >= (Oid) Trace_lock_oidmin))
133                 || (Trace_lock_table && (((LOCK *) MAKE_PTR(proclockP->tag.lock))->tag.relId == Trace_lock_table))
134                 )
135                 elog(LOG,
136                          "%s: proclock(%lx) lock(%lx) tbl(%d) proc(%lx) xid(%u) hold(%d,%d,%d,%d,%d,%d,%d)=%d",
137                          where, MAKE_OFFSET(proclockP), proclockP->tag.lock,
138                          PROCLOCK_LOCKMETHOD(*(proclockP)),
139                          proclockP->tag.proc, proclockP->tag.xid,
140                    proclockP->holding[1], proclockP->holding[2], proclockP->holding[3],
141                    proclockP->holding[4], proclockP->holding[5], proclockP->holding[6],
142                          proclockP->holding[7], proclockP->nHolding);
143 }
144
145 #else                                                   /* not LOCK_DEBUG */
146
147 #define LOCK_PRINT(where, lock, type)
148 #define PROCLOCK_PRINT(where, proclockP)
149 #endif   /* not LOCK_DEBUG */
150
151
152 /*
153  * These are to simplify/speed up some bit arithmetic.
154  *
155  * XXX is a fetch from a static array really faster than a shift?
156  * Wouldn't bet on it...
157  */
158
159 static LOCKMASK BITS_OFF[MAX_LOCKMODES];
160 static LOCKMASK BITS_ON[MAX_LOCKMODES];
161
162 /*
163  * map from lockmethod to the lock table structure
164  */
165 static LOCKMETHODTABLE *LockMethodTable[MAX_LOCK_METHODS];
166
167 static int      NumLockMethods;
168
169 /*
170  * InitLocks -- Init the lock module.  Create a private data
171  *              structure for constructing conflict masks.
172  */
173 void
174 InitLocks(void)
175 {
176         int                     i;
177         int                     bit;
178
179         bit = 1;
180         for (i = 0; i < MAX_LOCKMODES; i++, bit <<= 1)
181         {
182                 BITS_ON[i] = bit;
183                 BITS_OFF[i] = ~bit;
184         }
185 }
186
187
188 /*
189  * Fetch the lock method table associated with a given lock
190  */
191 LOCKMETHODTABLE *
192 GetLocksMethodTable(LOCK *lock)
193 {
194         LOCKMETHOD      lockmethod = LOCK_LOCKMETHOD(*lock);
195
196         Assert(lockmethod > 0 && lockmethod < NumLockMethods);
197         return LockMethodTable[lockmethod];
198 }
199
200
201 /*
202  * LockMethodInit -- initialize the lock table's lock type
203  *              structures
204  *
205  * Notes: just copying.  Should only be called once.
206  */
207 static void
208 LockMethodInit(LOCKMETHODTABLE *lockMethodTable,
209                            LOCKMASK *conflictsP,
210                            int numModes)
211 {
212         int                     i;
213
214         lockMethodTable->numLockModes = numModes;
215         numModes++;
216         for (i = 0; i < numModes; i++, conflictsP++)
217                 lockMethodTable->conflictTab[i] = *conflictsP;
218 }
219
220 /*
221  * LockMethodTableInit -- initialize a lock table structure
222  *
223  * NOTE: data structures allocated here are allocated permanently, using
224  * TopMemoryContext and shared memory.  We don't ever release them anyway,
225  * and in normal multi-backend operation the lock table structures set up
226  * by the postmaster are inherited by each backend, so they must be in
227  * TopMemoryContext.
228  */
229 LOCKMETHOD
230 LockMethodTableInit(char *tabName,
231                                         LOCKMASK *conflictsP,
232                                         int numModes,
233                                         int maxBackends)
234 {
235         LOCKMETHODTABLE *lockMethodTable;
236         char       *shmemName;
237         HASHCTL         info;
238         int                     hash_flags;
239         bool            found;
240         long            init_table_size,
241                                 max_table_size;
242
243         if (numModes >= MAX_LOCKMODES)
244         {
245                 elog(WARNING, "LockMethodTableInit: too many lock types %d greater than %d",
246                          numModes, MAX_LOCKMODES);
247                 return INVALID_LOCKMETHOD;
248         }
249
250         /* Compute init/max size to request for lock hashtables */
251         max_table_size = NLOCKENTS(maxBackends);
252         init_table_size = max_table_size / 10;
253
254         /* Allocate a string for the shmem index table lookups. */
255         /* This is just temp space in this routine, so palloc is OK. */
256         shmemName = (char *) palloc(strlen(tabName) + 32);
257
258         /* each lock table has a header in shared memory */
259         sprintf(shmemName, "%s (lock method table)", tabName);
260         lockMethodTable = (LOCKMETHODTABLE *)
261                 ShmemInitStruct(shmemName, sizeof(LOCKMETHODTABLE), &found);
262
263         if (!lockMethodTable)
264                 elog(FATAL, "LockMethodTableInit: couldn't initialize %s", tabName);
265
266         /*
267          * Lock the LWLock for the table (probably not necessary here)
268          */
269         LWLockAcquire(LockMgrLock, LW_EXCLUSIVE);
270
271         /*
272          * no zero-th table
273          */
274         NumLockMethods = 1;
275
276         /*
277          * we're first - initialize
278          */
279         if (!found)
280         {
281                 MemSet(lockMethodTable, 0, sizeof(LOCKMETHODTABLE));
282                 lockMethodTable->masterLock = LockMgrLock;
283                 lockMethodTable->lockmethod = NumLockMethods;
284         }
285
286         /*
287          * other modules refer to the lock table by a lockmethod ID
288          */
289         LockMethodTable[NumLockMethods] = lockMethodTable;
290         NumLockMethods++;
291         Assert(NumLockMethods <= MAX_LOCK_METHODS);
292
293         /*
294          * allocate a hash table for LOCK structs.      This is used to store
295          * per-locked-object information.
296          */
297         info.keysize = sizeof(LOCKTAG);
298         info.entrysize = sizeof(LOCK);
299         info.hash = tag_hash;
300         hash_flags = (HASH_ELEM | HASH_FUNCTION);
301
302         sprintf(shmemName, "%s (lock hash)", tabName);
303         lockMethodTable->lockHash = ShmemInitHash(shmemName,
304                                                                                           init_table_size,
305                                                                                           max_table_size,
306                                                                                           &info,
307                                                                                           hash_flags);
308
309         if (!lockMethodTable->lockHash)
310                 elog(FATAL, "LockMethodTableInit: couldn't initialize %s", tabName);
311         Assert(lockMethodTable->lockHash->hash == tag_hash);
312
313         /*
314          * allocate a hash table for PROCLOCK structs.  This is used to store
315          * per-lock-proclock information.
316          */
317         info.keysize = sizeof(PROCLOCKTAG);
318         info.entrysize = sizeof(PROCLOCK);
319         info.hash = tag_hash;
320         hash_flags = (HASH_ELEM | HASH_FUNCTION);
321
322         sprintf(shmemName, "%s (proclock hash)", tabName);
323         lockMethodTable->proclockHash = ShmemInitHash(shmemName,
324                                                                                                 init_table_size,
325                                                                                                 max_table_size,
326                                                                                                 &info,
327                                                                                                 hash_flags);
328
329         if (!lockMethodTable->proclockHash)
330                 elog(FATAL, "LockMethodTableInit: couldn't initialize %s", tabName);
331
332         /* init data structures */
333         LockMethodInit(lockMethodTable, conflictsP, numModes);
334
335         LWLockRelease(LockMgrLock);
336
337         pfree(shmemName);
338
339         return lockMethodTable->lockmethod;
340 }
341
342 /*
343  * LockMethodTableRename -- allocate another lockmethod ID to the same
344  *              lock table.
345  *
346  * NOTES: Both the lock module and the lock chain (lchain.c)
347  *              module use table id's to distinguish between different
348  *              kinds of locks.  Short term and long term locks look
349  *              the same to the lock table, but are handled differently
350  *              by the lock chain manager.      This function allows the
351  *              client to use different lockmethods when acquiring/releasing
352  *              short term and long term locks, yet store them all in one hashtable.
353  */
354
355 LOCKMETHOD
356 LockMethodTableRename(LOCKMETHOD lockmethod)
357 {
358         LOCKMETHOD      newLockMethod;
359
360         if (NumLockMethods >= MAX_LOCK_METHODS)
361                 return INVALID_LOCKMETHOD;
362         if (LockMethodTable[lockmethod] == INVALID_LOCKMETHOD)
363                 return INVALID_LOCKMETHOD;
364
365         /* other modules refer to the lock table by a lockmethod ID */
366         newLockMethod = NumLockMethods;
367         NumLockMethods++;
368
369         LockMethodTable[newLockMethod] = LockMethodTable[lockmethod];
370         return newLockMethod;
371 }
372
373 /*
374  * LockAcquire -- Check for lock conflicts, sleep if conflict found,
375  *              set lock if/when no conflicts.
376  *
377  * Returns: TRUE if lock was acquired, FALSE otherwise.  Note that
378  *              a FALSE return is to be expected if dontWait is TRUE;
379  *              but if dontWait is FALSE, only a parameter error can cause
380  *              a FALSE return.  (XXX probably we should just elog on parameter
381  *              errors, instead of conflating this with failure to acquire lock?)
382  *
383  * Side Effects: The lock is acquired and recorded in lock tables.
384  *
385  * NOTE: if we wait for the lock, there is no way to abort the wait
386  * short of aborting the transaction.
387  *
388  *
389  * Note on User Locks:
390  *
391  *              User locks are handled totally on the application side as
392  *              long term cooperative locks which extend beyond the normal
393  *              transaction boundaries.  Their purpose is to indicate to an
394  *              application that someone is `working' on an item.  So it is
395  *              possible to put an user lock on a tuple's oid, retrieve the
396  *              tuple, work on it for an hour and then update it and remove
397  *              the lock.  While the lock is active other clients can still
398  *              read and write the tuple but they can be aware that it has
399  *              been locked at the application level by someone.
400  *              User locks use lock tags made of an uint16 and an uint32, for
401  *              example 0 and a tuple oid, or any other arbitrary pair of
402  *              numbers following a convention established by the application.
403  *              In this sense tags don't refer to tuples or database entities.
404  *              User locks and normal locks are completely orthogonal and
405  *              they don't interfere with each other, so it is possible
406  *              to acquire a normal lock on an user-locked tuple or user-lock
407  *              a tuple for which a normal write lock already exists.
408  *              User locks are always non blocking, therefore they are never
409  *              acquired if already held by another process.  They must be
410  *              released explicitly by the application but they are released
411  *              automatically when a backend terminates.
412  *              They are indicated by a lockmethod 2 which is an alias for the
413  *              normal lock table, and are distinguished from normal locks
414  *              by the following differences:
415  *
416  *                                                                              normal lock             user lock
417  *
418  *              lockmethod                                              1                               2
419  *              tag.dbId                                                database oid    database oid
420  *              tag.relId                                               rel oid or 0    0
421  *              tag.objId                                               block id                lock id2
422  *                                                                              or xact id
423  *              tag.offnum                                              0                               lock id1
424  *              proclock.xid                                    xid or 0                0
425  *              persistence                                             transaction             user or backend
426  *                                                                              or backend
427  *
428  *              The lockmode parameter can have the same values for normal locks
429  *              although probably only WRITE_LOCK can have some practical use.
430  *
431  *                                                                                                              DZ - 22 Nov 1997
432  */
433
434 bool
435 LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
436                         TransactionId xid, LOCKMODE lockmode, bool dontWait)
437 {
438         PROCLOCK   *proclock;
439         PROCLOCKTAG proclocktag;
440         HTAB       *proclockTable;
441         bool            found;
442         LOCK       *lock;
443         LWLockId        masterLock;
444         LOCKMETHODTABLE *lockMethodTable;
445         int                     status;
446         int                     myHolding[MAX_LOCKMODES];
447         int                     i;
448
449 #ifdef LOCK_DEBUG
450         if (lockmethod == USER_LOCKMETHOD && Trace_userlocks)
451                 elog(LOG, "LockAcquire: user lock [%u] %s",
452                          locktag->objId.blkno, lock_mode_names[lockmode]);
453 #endif
454
455         /* ???????? This must be changed when short term locks will be used */
456         locktag->lockmethod = lockmethod;
457
458         Assert(lockmethod < NumLockMethods);
459         lockMethodTable = LockMethodTable[lockmethod];
460         if (!lockMethodTable)
461         {
462                 elog(WARNING, "LockAcquire: bad lock table %d", lockmethod);
463                 return FALSE;
464         }
465
466         masterLock = lockMethodTable->masterLock;
467
468         LWLockAcquire(masterLock, LW_EXCLUSIVE);
469
470         /*
471          * Find or create a lock with this tag
472          */
473         Assert(lockMethodTable->lockHash->hash == tag_hash);
474         lock = (LOCK *) hash_search(lockMethodTable->lockHash,
475                                                                 (void *) locktag,
476                                                                 HASH_ENTER, &found);
477         if (!lock)
478         {
479                 LWLockRelease(masterLock);
480                 elog(ERROR, "LockAcquire: lock table %d is out of memory",
481                          lockmethod);
482                 return FALSE;
483         }
484
485         /*
486          * if it's a new lock object, initialize it
487          */
488         if (!found)
489         {
490                 lock->grantMask = 0;
491                 lock->waitMask = 0;
492                 SHMQueueInit(&(lock->lockHolders));
493                 ProcQueueInit(&(lock->waitProcs));
494                 lock->nRequested = 0;
495                 lock->nGranted = 0;
496                 MemSet((char *) lock->requested, 0, sizeof(int) * MAX_LOCKMODES);
497                 MemSet((char *) lock->granted, 0, sizeof(int) * MAX_LOCKMODES);
498                 LOCK_PRINT("LockAcquire: new", lock, lockmode);
499         }
500         else
501         {
502                 LOCK_PRINT("LockAcquire: found", lock, lockmode);
503                 Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
504                 Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
505                 Assert(lock->nGranted <= lock->nRequested);
506         }
507
508         /*
509          * Create the hash key for the proclock table.
510          */
511         MemSet(&proclocktag, 0, sizeof(PROCLOCKTAG)); /* must clear padding,
512                                                                                                  * needed */
513         proclocktag.lock = MAKE_OFFSET(lock);
514         proclocktag.proc = MAKE_OFFSET(MyProc);
515         TransactionIdStore(xid, &proclocktag.xid);
516
517         /*
518          * Find or create a proclock entry with this tag
519          */
520         proclockTable = lockMethodTable->proclockHash;
521         proclock = (PROCLOCK *) hash_search(proclockTable,
522                                                                           (void *) &proclocktag,
523                                                                           HASH_ENTER, &found);
524         if (!proclock)
525         {
526                 LWLockRelease(masterLock);
527                 elog(ERROR, "LockAcquire: proclock table out of memory");
528                 return FALSE;
529         }
530
531         /*
532          * If new, initialize the new entry
533          */
534         if (!found)
535         {
536                 proclock->nHolding = 0;
537                 MemSet((char *) proclock->holding, 0, sizeof(int) * MAX_LOCKMODES);
538                 /* Add proclock to appropriate lists */
539                 SHMQueueInsertBefore(&lock->lockHolders, &proclock->lockLink);
540                 SHMQueueInsertBefore(&MyProc->procHolders, &proclock->procLink);
541                 PROCLOCK_PRINT("LockAcquire: new", proclock);
542         }
543         else
544         {
545                 PROCLOCK_PRINT("LockAcquire: found", proclock);
546                 Assert((proclock->nHolding >= 0) && (proclock->holding[lockmode] >= 0));
547                 Assert(proclock->nHolding <= lock->nGranted);
548
549 #ifdef CHECK_DEADLOCK_RISK
550
551                 /*
552                  * Issue warning if we already hold a lower-level lock on this
553                  * object and do not hold a lock of the requested level or higher.
554                  * This indicates a deadlock-prone coding practice (eg, we'd have
555                  * a deadlock if another backend were following the same code path
556                  * at about the same time).
557                  *
558                  * This is not enabled by default, because it may generate log
559                  * entries about user-level coding practices that are in fact safe
560                  * in context. It can be enabled to help find system-level
561                  * problems.
562                  *
563                  * XXX Doing numeric comparison on the lockmodes is a hack; it'd be
564                  * better to use a table.  For now, though, this works.
565                  */
566                 for (i = lockMethodTable->numLockModes; i > 0; i--)
567                 {
568                         if (proclock->holding[i] > 0)
569                         {
570                                 if (i >= (int) lockmode)
571                                         break;          /* safe: we have a lock >= req level */
572                                 elog(LOG, "Deadlock risk: raising lock level"
573                                          " from %s to %s on object %u/%u/%u",
574                                          lock_mode_names[i], lock_mode_names[lockmode],
575                                  lock->tag.relId, lock->tag.dbId, lock->tag.objId.blkno);
576                                 break;
577                         }
578                 }
579 #endif   /* CHECK_DEADLOCK_RISK */
580         }
581
582         /*
583          * lock->nRequested and lock->requested[] count the total number of
584          * requests, whether granted or waiting, so increment those
585          * immediately. The other counts don't increment till we get the lock.
586          */
587         lock->nRequested++;
588         lock->requested[lockmode]++;
589         Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
590
591         /*
592          * If I already hold one or more locks of the requested type, just
593          * grant myself another one without blocking.
594          */
595         if (proclock->holding[lockmode] > 0)
596         {
597                 GrantLock(lock, proclock, lockmode);
598                 PROCLOCK_PRINT("LockAcquire: owning", proclock);
599                 LWLockRelease(masterLock);
600                 return TRUE;
601         }
602
603         /*
604          * If this process (under any XID) is a proclock of the lock, also grant
605          * myself another one without blocking.
606          */
607         LockCountMyLocks(proclock->tag.lock, MyProc, myHolding);
608         if (myHolding[lockmode] > 0)
609         {
610                 GrantLock(lock, proclock, lockmode);
611                 PROCLOCK_PRINT("LockAcquire: my other XID owning", proclock);
612                 LWLockRelease(masterLock);
613                 return TRUE;
614         }
615
616         /*
617          * If lock requested conflicts with locks requested by waiters, must
618          * join wait queue.  Otherwise, check for conflict with already-held
619          * locks.  (That's last because most complex check.)
620          */
621         if (lockMethodTable->conflictTab[lockmode] & lock->waitMask)
622                 status = STATUS_FOUND;
623         else
624                 status = LockCheckConflicts(lockMethodTable, lockmode,
625                                                                         lock, proclock,
626                                                                         MyProc, myHolding);
627
628         if (status == STATUS_OK)
629         {
630                 /* No conflict with held or previously requested locks */
631                 GrantLock(lock, proclock, lockmode);
632         }
633         else
634         {
635                 Assert(status == STATUS_FOUND);
636
637                 /*
638                  * We can't acquire the lock immediately.  If caller specified no
639                  * blocking, remove the proclock entry and return FALSE without
640                  * waiting.
641                  */
642                 if (dontWait)
643                 {
644                         if (proclock->nHolding == 0)
645                         {
646                                 SHMQueueDelete(&proclock->lockLink);
647                                 SHMQueueDelete(&proclock->procLink);
648                                 proclock = (PROCLOCK *) hash_search(proclockTable,
649                                                                                                   (void *) proclock,
650                                                                                                   HASH_REMOVE, NULL);
651                                 if (!proclock)
652                                         elog(WARNING, "LockAcquire: remove proclock, table corrupted");
653                         }
654                         else
655                                 PROCLOCK_PRINT("LockAcquire: NHOLDING", proclock);
656                         lock->nRequested--;
657                         lock->requested[lockmode]--;
658                         LOCK_PRINT("LockAcquire: conditional lock failed", lock, lockmode);
659                         Assert((lock->nRequested > 0) && (lock->requested[lockmode] >= 0));
660                         Assert(lock->nGranted <= lock->nRequested);
661                         LWLockRelease(masterLock);
662                         return FALSE;
663                 }
664
665                 /*
666                  * Construct bitmask of locks this process holds on this object.
667                  */
668                 {
669                         int                     heldLocks = 0;
670                         int                     tmpMask;
671
672                         for (i = 1, tmpMask = 2;
673                                  i <= lockMethodTable->numLockModes;
674                                  i++, tmpMask <<= 1)
675                         {
676                                 if (myHolding[i] > 0)
677                                         heldLocks |= tmpMask;
678                         }
679                         MyProc->heldLocks = heldLocks;
680                 }
681
682                 /*
683                  * Sleep till someone wakes me up.
684                  */
685                 status = WaitOnLock(lockmethod, lockmode, lock, proclock);
686
687                 /*
688                  * NOTE: do not do any material change of state between here and
689                  * return.      All required changes in locktable state must have been
690                  * done when the lock was granted to us --- see notes in
691                  * WaitOnLock.
692                  */
693
694                 /*
695                  * Check the proclock entry status, in case something in the ipc
696                  * communication doesn't work correctly.
697                  */
698                 if (!((proclock->nHolding > 0) && (proclock->holding[lockmode] > 0)))
699                 {
700                         PROCLOCK_PRINT("LockAcquire: INCONSISTENT", proclock);
701                         LOCK_PRINT("LockAcquire: INCONSISTENT", lock, lockmode);
702                         /* Should we retry ? */
703                         LWLockRelease(masterLock);
704                         return FALSE;
705                 }
706                 PROCLOCK_PRINT("LockAcquire: granted", proclock);
707                 LOCK_PRINT("LockAcquire: granted", lock, lockmode);
708         }
709
710         LWLockRelease(masterLock);
711
712         return status == STATUS_OK;
713 }
714
715 /*
716  * LockCheckConflicts -- test whether requested lock conflicts
717  *              with those already granted
718  *
719  * Returns STATUS_FOUND if conflict, STATUS_OK if no conflict.
720  *
721  * NOTES:
722  *              Here's what makes this complicated: one process's locks don't
723  * conflict with one another, even if they are held under different
724  * transaction IDs (eg, session and xact locks do not conflict).
725  * So, we must subtract off our own locks when determining whether the
726  * requested new lock conflicts with those already held.
727  *
728  * The caller can optionally pass the process's total holding counts, if
729  * known.  If NULL is passed then these values will be computed internally.
730  */
731 int
732 LockCheckConflicts(LOCKMETHODTABLE *lockMethodTable,
733                                    LOCKMODE lockmode,
734                                    LOCK *lock,
735                                    PROCLOCK *proclock,
736                                    PGPROC *proc,
737                                    int *myHolding)              /* myHolding[] array or NULL */
738 {
739         int                     numLockModes = lockMethodTable->numLockModes;
740         int                     bitmask;
741         int                     i,
742                                 tmpMask;
743         int                     localHolding[MAX_LOCKMODES];
744
745         /*
746          * first check for global conflicts: If no locks conflict with my
747          * request, then I get the lock.
748          *
749          * Checking for conflict: lock->grantMask represents the types of
750          * currently held locks.  conflictTable[lockmode] has a bit set for
751          * each type of lock that conflicts with request.       Bitwise compare
752          * tells if there is a conflict.
753          */
754         if (!(lockMethodTable->conflictTab[lockmode] & lock->grantMask))
755         {
756                 PROCLOCK_PRINT("LockCheckConflicts: no conflict", proclock);
757                 return STATUS_OK;
758         }
759
760         /*
761          * Rats.  Something conflicts. But it could still be my own lock.  We
762          * have to construct a conflict mask that does not reflect our own
763          * locks.  Locks held by the current process under another XID also
764          * count as "our own locks".
765          */
766         if (myHolding == NULL)
767         {
768                 /* Caller didn't do calculation of total holding for me */
769                 LockCountMyLocks(proclock->tag.lock, proc, localHolding);
770                 myHolding = localHolding;
771         }
772
773         /* Compute mask of lock types held by other processes */
774         bitmask = 0;
775         tmpMask = 2;
776         for (i = 1; i <= numLockModes; i++, tmpMask <<= 1)
777         {
778                 if (lock->granted[i] != myHolding[i])
779                         bitmask |= tmpMask;
780         }
781
782         /*
783          * now check again for conflicts.  'bitmask' describes the types of
784          * locks held by other processes.  If one of these conflicts with the
785          * kind of lock that I want, there is a conflict and I have to sleep.
786          */
787         if (!(lockMethodTable->conflictTab[lockmode] & bitmask))
788         {
789                 /* no conflict. OK to get the lock */
790                 PROCLOCK_PRINT("LockCheckConflicts: resolved", proclock);
791                 return STATUS_OK;
792         }
793
794         PROCLOCK_PRINT("LockCheckConflicts: conflicting", proclock);
795         return STATUS_FOUND;
796 }
797
798 /*
799  * LockCountMyLocks --- Count total number of locks held on a given lockable
800  *              object by a given process (under any transaction ID).
801  *
802  * XXX This could be rather slow if the process holds a large number of locks.
803  * Perhaps it could be sped up if we kept yet a third hashtable of per-
804  * process lock information.  However, for the normal case where a transaction
805  * doesn't hold a large number of locks, keeping such a table would probably
806  * be a net slowdown.
807  */
808 static void
809 LockCountMyLocks(SHMEM_OFFSET lockOffset, PGPROC *proc, int *myHolding)
810 {
811         SHM_QUEUE  *procHolders = &(proc->procHolders);
812         PROCLOCK   *proclock;
813         int                     i;
814
815         MemSet(myHolding, 0, MAX_LOCKMODES * sizeof(int));
816
817         proclock = (PROCLOCK *) SHMQueueNext(procHolders, procHolders,
818                                                                            offsetof(PROCLOCK, procLink));
819
820         while (proclock)
821         {
822                 if (lockOffset == proclock->tag.lock)
823                 {
824                         for (i = 1; i < MAX_LOCKMODES; i++)
825                                 myHolding[i] += proclock->holding[i];
826                 }
827
828                 proclock = (PROCLOCK *) SHMQueueNext(procHolders, &proclock->procLink,
829                                                                                    offsetof(PROCLOCK, procLink));
830         }
831 }
832
833 /*
834  * GrantLock -- update the lock and proclock data structures to show
835  *              the lock request has been granted.
836  *
837  * NOTE: if proc was blocked, it also needs to be removed from the wait list
838  * and have its waitLock/waitHolder fields cleared.  That's not done here.
839  */
840 void
841 GrantLock(LOCK *lock, PROCLOCK *proclock, LOCKMODE lockmode)
842 {
843         lock->nGranted++;
844         lock->granted[lockmode]++;
845         lock->grantMask |= BITS_ON[lockmode];
846         if (lock->granted[lockmode] == lock->requested[lockmode])
847                 lock->waitMask &= BITS_OFF[lockmode];
848         LOCK_PRINT("GrantLock", lock, lockmode);
849         Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
850         Assert(lock->nGranted <= lock->nRequested);
851         proclock->holding[lockmode]++;
852         proclock->nHolding++;
853         Assert((proclock->nHolding > 0) && (proclock->holding[lockmode] > 0));
854 }
855
856 /*
857  * WaitOnLock -- wait to acquire a lock
858  *
859  * Caller must have set MyProc->heldLocks to reflect locks already held
860  * on the lockable object by this process (under all XIDs).
861  *
862  * The locktable's masterLock must be held at entry.
863  */
864 static int
865 WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode,
866                    LOCK *lock, PROCLOCK *proclock)
867 {
868         LOCKMETHODTABLE *lockMethodTable = LockMethodTable[lockmethod];
869         char       *new_status,
870                            *old_status;
871
872         Assert(lockmethod < NumLockMethods);
873
874         LOCK_PRINT("WaitOnLock: sleeping on lock", lock, lockmode);
875
876         old_status = pstrdup(get_ps_display());
877         new_status = (char *) palloc(strlen(old_status) + 10);
878         strcpy(new_status, old_status);
879         strcat(new_status, " waiting");
880         set_ps_display(new_status);
881
882         /*
883          * NOTE: Think not to put any shared-state cleanup after the call to
884          * ProcSleep, in either the normal or failure path.  The lock state
885          * must be fully set by the lock grantor, or by CheckDeadLock if we
886          * give up waiting for the lock.  This is necessary because of the
887          * possibility that a cancel/die interrupt will interrupt ProcSleep
888          * after someone else grants us the lock, but before we've noticed it.
889          * Hence, after granting, the locktable state must fully reflect the
890          * fact that we own the lock; we can't do additional work on return.
891          * Contrariwise, if we fail, any cleanup must happen in xact abort
892          * processing, not here, to ensure it will also happen in the
893          * cancel/die case.
894          */
895
896         if (ProcSleep(lockMethodTable,
897                                   lockmode,
898                                   lock,
899                                   proclock) != STATUS_OK)
900         {
901                 /*
902                  * We failed as a result of a deadlock, see CheckDeadLock(). Quit
903                  * now.  Removal of the proclock and lock objects, if no longer
904                  * needed, will happen in xact cleanup (see above for motivation).
905                  */
906                 LOCK_PRINT("WaitOnLock: aborting on lock", lock, lockmode);
907                 LWLockRelease(lockMethodTable->masterLock);
908                 /*
909                  * Now that we aren't holding the LockMgrLock, print details about
910                  * the detected deadlock.  We didn't want to do this before because
911                  * sending elog messages to the client while holding the shared lock
912                  * is bad for concurrency.
913                  */
914                 DeadLockReport();
915                 elog(ERROR, "deadlock detected");
916                 /* not reached */
917         }
918
919         set_ps_display(old_status);
920         pfree(old_status);
921         pfree(new_status);
922
923         LOCK_PRINT("WaitOnLock: wakeup on lock", lock, lockmode);
924         return STATUS_OK;
925 }
926
927 /*
928  * Remove a proc from the wait-queue it is on
929  * (caller must know it is on one).
930  *
931  * Locktable lock must be held by caller.
932  *
933  * NB: this does not remove the process' proclock object, nor the lock object,
934  * even though their counts might now have gone to zero.  That will happen
935  * during a subsequent LockReleaseAll call, which we expect will happen
936  * during transaction cleanup.  (Removal of a proc from its wait queue by
937  * this routine can only happen if we are aborting the transaction.)
938  */
939 void
940 RemoveFromWaitQueue(PGPROC *proc)
941 {
942         LOCK       *waitLock = proc->waitLock;
943         LOCKMODE        lockmode = proc->waitLockMode;
944
945         /* Make sure proc is waiting */
946         Assert(proc->links.next != INVALID_OFFSET);
947         Assert(waitLock);
948         Assert(waitLock->waitProcs.size > 0);
949
950         /* Remove proc from lock's wait queue */
951         SHMQueueDelete(&(proc->links));
952         waitLock->waitProcs.size--;
953
954         /* Undo increments of request counts by waiting process */
955         Assert(waitLock->nRequested > 0);
956         Assert(waitLock->nRequested > proc->waitLock->nGranted);
957         waitLock->nRequested--;
958         Assert(waitLock->requested[lockmode] > 0);
959         waitLock->requested[lockmode]--;
960         /* don't forget to clear waitMask bit if appropriate */
961         if (waitLock->granted[lockmode] == waitLock->requested[lockmode])
962                 waitLock->waitMask &= BITS_OFF[lockmode];
963
964         /* Clean up the proc's own state */
965         proc->waitLock = NULL;
966         proc->waitHolder = NULL;
967
968         /* See if any other waiters for the lock can be woken up now */
969         ProcLockWakeup(GetLocksMethodTable(waitLock), waitLock);
970 }
971
972 /*
973  * LockRelease -- look up 'locktag' in lock table 'lockmethod' and
974  *              release one 'lockmode' lock on it.
975  *
976  * Side Effects: find any waiting processes that are now wakable,
977  *              grant them their requested locks and awaken them.
978  *              (We have to grant the lock here to avoid a race between
979  *              the waking process and any new process to
980  *              come along and request the lock.)
981  */
982 bool
983 LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag,
984                         TransactionId xid, LOCKMODE lockmode)
985 {
986         LOCK       *lock;
987         LWLockId        masterLock;
988         LOCKMETHODTABLE *lockMethodTable;
989         PROCLOCK   *proclock;
990         PROCLOCKTAG proclocktag;
991         HTAB       *proclockTable;
992         bool            wakeupNeeded = false;
993
994 #ifdef LOCK_DEBUG
995         if (lockmethod == USER_LOCKMETHOD && Trace_userlocks)
996                 elog(LOG, "LockRelease: user lock tag [%u] %d", locktag->objId.blkno, lockmode);
997 #endif
998
999         /* ???????? This must be changed when short term locks will be used */
1000         locktag->lockmethod = lockmethod;
1001
1002         Assert(lockmethod < NumLockMethods);
1003         lockMethodTable = LockMethodTable[lockmethod];
1004         if (!lockMethodTable)
1005         {
1006                 elog(WARNING, "lockMethodTable is null in LockRelease");
1007                 return FALSE;
1008         }
1009
1010         masterLock = lockMethodTable->masterLock;
1011         LWLockAcquire(masterLock, LW_EXCLUSIVE);
1012
1013         /*
1014          * Find a lock with this tag
1015          */
1016         Assert(lockMethodTable->lockHash->hash == tag_hash);
1017         lock = (LOCK *) hash_search(lockMethodTable->lockHash,
1018                                                                 (void *) locktag,
1019                                                                 HASH_FIND, NULL);
1020
1021         /*
1022          * let the caller print its own error message, too. Do not
1023          * elog(ERROR).
1024          */
1025         if (!lock)
1026         {
1027                 LWLockRelease(masterLock);
1028                 elog(WARNING, "LockRelease: no such lock");
1029                 return FALSE;
1030         }
1031         LOCK_PRINT("LockRelease: found", lock, lockmode);
1032
1033         /*
1034          * Find the proclock entry for this proclock.
1035          */
1036         MemSet(&proclocktag, 0, sizeof(PROCLOCKTAG)); /* must clear padding,
1037                                                                                                  * needed */
1038         proclocktag.lock = MAKE_OFFSET(lock);
1039         proclocktag.proc = MAKE_OFFSET(MyProc);
1040         TransactionIdStore(xid, &proclocktag.xid);
1041
1042         proclockTable = lockMethodTable->proclockHash;
1043         proclock = (PROCLOCK *) hash_search(proclockTable,
1044                                                                           (void *) &proclocktag,
1045                                                                           HASH_FIND_SAVE, NULL);
1046         if (!proclock)
1047         {
1048                 LWLockRelease(masterLock);
1049 #ifdef USER_LOCKS
1050                 if (lockmethod == USER_LOCKMETHOD)
1051                         elog(WARNING, "LockRelease: no lock with this tag");
1052                 else
1053 #endif
1054                         elog(WARNING, "LockRelease: proclock table corrupted");
1055                 return FALSE;
1056         }
1057         PROCLOCK_PRINT("LockRelease: found", proclock);
1058
1059         /*
1060          * Check that we are actually holding a lock of the type we want to
1061          * release.
1062          */
1063         if (!(proclock->holding[lockmode] > 0))
1064         {
1065                 PROCLOCK_PRINT("LockRelease: WRONGTYPE", proclock);
1066                 Assert(proclock->holding[lockmode] >= 0);
1067                 LWLockRelease(masterLock);
1068                 elog(WARNING, "LockRelease: you don't own a lock of type %s",
1069                          lock_mode_names[lockmode]);
1070                 return FALSE;
1071         }
1072         Assert(proclock->nHolding > 0);
1073         Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
1074         Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
1075         Assert(lock->nGranted <= lock->nRequested);
1076
1077         /*
1078          * fix the general lock stats
1079          */
1080         lock->nRequested--;
1081         lock->requested[lockmode]--;
1082         lock->nGranted--;
1083         lock->granted[lockmode]--;
1084
1085         if (lock->granted[lockmode] == 0)
1086         {
1087                 /* change the conflict mask.  No more of this lock type. */
1088                 lock->grantMask &= BITS_OFF[lockmode];
1089         }
1090
1091         LOCK_PRINT("LockRelease: updated", lock, lockmode);
1092         Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
1093         Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
1094         Assert(lock->nGranted <= lock->nRequested);
1095
1096         /*
1097          * We need only run ProcLockWakeup if the released lock conflicts with
1098          * at least one of the lock types requested by waiter(s).  Otherwise
1099          * whatever conflict made them wait must still exist.  NOTE: before
1100          * MVCC, we could skip wakeup if lock->granted[lockmode] was still
1101          * positive. But that's not true anymore, because the remaining
1102          * granted locks might belong to some waiter, who could now be
1103          * awakened because he doesn't conflict with his own locks.
1104          */
1105         if (lockMethodTable->conflictTab[lockmode] & lock->waitMask)
1106                 wakeupNeeded = true;
1107
1108         if (lock->nRequested == 0)
1109         {
1110                 /*
1111                  * if there's no one waiting in the queue, we just released the
1112                  * last lock on this object. Delete it from the lock table.
1113                  */
1114                 Assert(lockMethodTable->lockHash->hash == tag_hash);
1115                 lock = (LOCK *) hash_search(lockMethodTable->lockHash,
1116                                                                         (void *) &(lock->tag),
1117                                                                         HASH_REMOVE,
1118                                                                         NULL);
1119                 if (!lock)
1120                 {
1121                         LWLockRelease(masterLock);
1122                         elog(WARNING, "LockRelease: remove lock, table corrupted");
1123                         return FALSE;
1124                 }
1125                 wakeupNeeded = false;   /* should be false, but make sure */
1126         }
1127
1128         /*
1129          * Now fix the per-proclock lock stats.
1130          */
1131         proclock->holding[lockmode]--;
1132         proclock->nHolding--;
1133         PROCLOCK_PRINT("LockRelease: updated", proclock);
1134         Assert((proclock->nHolding >= 0) && (proclock->holding[lockmode] >= 0));
1135
1136         /*
1137          * If this was my last hold on this lock, delete my entry in the
1138          * proclock table.
1139          */
1140         if (proclock->nHolding == 0)
1141         {
1142                 PROCLOCK_PRINT("LockRelease: deleting", proclock);
1143                 SHMQueueDelete(&proclock->lockLink);
1144                 SHMQueueDelete(&proclock->procLink);
1145                 proclock = (PROCLOCK *) hash_search(proclockTable,
1146                                                                                   (void *) &proclock,
1147                                                                                   HASH_REMOVE_SAVED, NULL);
1148                 if (!proclock)
1149                 {
1150                         LWLockRelease(masterLock);
1151                         elog(WARNING, "LockRelease: remove proclock, table corrupted");
1152                         return FALSE;
1153                 }
1154         }
1155
1156         /*
1157          * Wake up waiters if needed.
1158          */
1159         if (wakeupNeeded)
1160                 ProcLockWakeup(lockMethodTable, lock);
1161
1162         LWLockRelease(masterLock);
1163         return TRUE;
1164 }
1165
1166 /*
1167  * LockReleaseAll -- Release all locks in a process's lock list.
1168  *
1169  * Well, not really *all* locks.
1170  *
1171  * If 'allxids' is TRUE, all locks of the specified lock method are
1172  * released, regardless of transaction affiliation.
1173  *
1174  * If 'allxids' is FALSE, all locks of the specified lock method and
1175  * specified XID are released.
1176  */
1177 bool
1178 LockReleaseAll(LOCKMETHOD lockmethod, PGPROC *proc,
1179                            bool allxids, TransactionId xid)
1180 {
1181         SHM_QUEUE  *procHolders = &(proc->procHolders);
1182         PROCLOCK   *proclock;
1183         PROCLOCK   *nextHolder;
1184         LWLockId        masterLock;
1185         LOCKMETHODTABLE *lockMethodTable;
1186         int                     i,
1187                                 numLockModes;
1188         LOCK       *lock;
1189
1190 #ifdef LOCK_DEBUG
1191         if (lockmethod == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
1192                 elog(LOG, "LockReleaseAll: lockmethod=%d, pid=%d",
1193                          lockmethod, proc->pid);
1194 #endif
1195
1196         Assert(lockmethod < NumLockMethods);
1197         lockMethodTable = LockMethodTable[lockmethod];
1198         if (!lockMethodTable)
1199         {
1200                 elog(WARNING, "LockReleaseAll: bad lockmethod %d", lockmethod);
1201                 return FALSE;
1202         }
1203
1204         numLockModes = lockMethodTable->numLockModes;
1205         masterLock = lockMethodTable->masterLock;
1206
1207         LWLockAcquire(masterLock, LW_EXCLUSIVE);
1208
1209         proclock = (PROCLOCK *) SHMQueueNext(procHolders, procHolders,
1210                                                                            offsetof(PROCLOCK, procLink));
1211
1212         while (proclock)
1213         {
1214                 bool            wakeupNeeded = false;
1215
1216                 /* Get link first, since we may unlink/delete this proclock */
1217                 nextHolder = (PROCLOCK *) SHMQueueNext(procHolders, &proclock->procLink,
1218                                                                                    offsetof(PROCLOCK, procLink));
1219
1220                 Assert(proclock->tag.proc == MAKE_OFFSET(proc));
1221
1222                 lock = (LOCK *) MAKE_PTR(proclock->tag.lock);
1223
1224                 /* Ignore items that are not of the lockmethod to be removed */
1225                 if (LOCK_LOCKMETHOD(*lock) != lockmethod)
1226                         goto next_item;
1227
1228                 /* If not allxids, ignore items that are of the wrong xid */
1229                 if (!allxids && !TransactionIdEquals(xid, proclock->tag.xid))
1230                         goto next_item;
1231
1232                 PROCLOCK_PRINT("LockReleaseAll", proclock);
1233                 LOCK_PRINT("LockReleaseAll", lock, 0);
1234                 Assert(lock->nRequested >= 0);
1235                 Assert(lock->nGranted >= 0);
1236                 Assert(lock->nGranted <= lock->nRequested);
1237                 Assert(proclock->nHolding >= 0);
1238                 Assert(proclock->nHolding <= lock->nRequested);
1239
1240                 /*
1241                  * fix the general lock stats
1242                  */
1243                 if (lock->nRequested != proclock->nHolding)
1244                 {
1245                         for (i = 1; i <= numLockModes; i++)
1246                         {
1247                                 Assert(proclock->holding[i] >= 0);
1248                                 if (proclock->holding[i] > 0)
1249                                 {
1250                                         lock->requested[i] -= proclock->holding[i];
1251                                         lock->granted[i] -= proclock->holding[i];
1252                                         Assert(lock->requested[i] >= 0 && lock->granted[i] >= 0);
1253                                         if (lock->granted[i] == 0)
1254                                                 lock->grantMask &= BITS_OFF[i];
1255
1256                                         /*
1257                                          * Read comments in LockRelease
1258                                          */
1259                                         if (!wakeupNeeded &&
1260                                                 lockMethodTable->conflictTab[i] & lock->waitMask)
1261                                                 wakeupNeeded = true;
1262                                 }
1263                         }
1264                         lock->nRequested -= proclock->nHolding;
1265                         lock->nGranted -= proclock->nHolding;
1266                         Assert((lock->nRequested >= 0) && (lock->nGranted >= 0));
1267                         Assert(lock->nGranted <= lock->nRequested);
1268                 }
1269                 else
1270                 {
1271                         /*
1272                          * This proclock accounts for all the requested locks on the
1273                          * object, so we can be lazy and just zero things out.
1274                          */
1275                         lock->nRequested = 0;
1276                         lock->nGranted = 0;
1277                         /* Fix the lock status, just for next LOCK_PRINT message. */
1278                         for (i = 1; i <= numLockModes; i++)
1279                         {
1280                                 Assert(lock->requested[i] == lock->granted[i]);
1281                                 lock->requested[i] = lock->granted[i] = 0;
1282                         }
1283                 }
1284                 LOCK_PRINT("LockReleaseAll: updated", lock, 0);
1285
1286                 PROCLOCK_PRINT("LockReleaseAll: deleting", proclock);
1287
1288                 /*
1289                  * Remove the proclock entry from the linked lists
1290                  */
1291                 SHMQueueDelete(&proclock->lockLink);
1292                 SHMQueueDelete(&proclock->procLink);
1293
1294                 /*
1295                  * remove the proclock entry from the hashtable
1296                  */
1297                 proclock = (PROCLOCK *) hash_search(lockMethodTable->proclockHash,
1298                                                                                   (void *) proclock,
1299                                                                                   HASH_REMOVE,
1300                                                                                   NULL);
1301                 if (!proclock)
1302                 {
1303                         LWLockRelease(masterLock);
1304                         elog(WARNING, "LockReleaseAll: proclock table corrupted");
1305                         return FALSE;
1306                 }
1307
1308                 if (lock->nRequested == 0)
1309                 {
1310                         /*
1311                          * We've just released the last lock, so garbage-collect the
1312                          * lock object.
1313                          */
1314                         LOCK_PRINT("LockReleaseAll: deleting", lock, 0);
1315                         Assert(lockMethodTable->lockHash->hash == tag_hash);
1316                         lock = (LOCK *) hash_search(lockMethodTable->lockHash,
1317                                                                                 (void *) &(lock->tag),
1318                                                                                 HASH_REMOVE, NULL);
1319                         if (!lock)
1320                         {
1321                                 LWLockRelease(masterLock);
1322                                 elog(WARNING, "LockReleaseAll: cannot remove lock from HTAB");
1323                                 return FALSE;
1324                         }
1325                 }
1326                 else if (wakeupNeeded)
1327                         ProcLockWakeup(lockMethodTable, lock);
1328
1329 next_item:
1330                 proclock = nextHolder;
1331         }
1332
1333         LWLockRelease(masterLock);
1334
1335 #ifdef LOCK_DEBUG
1336         if (lockmethod == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
1337                 elog(LOG, "LockReleaseAll: done");
1338 #endif
1339
1340         return TRUE;
1341 }
1342
1343 int
1344 LockShmemSize(int maxBackends)
1345 {
1346         int                     size = 0;
1347         long            max_table_size = NLOCKENTS(maxBackends);
1348
1349         size += MAXALIGN(sizeof(PROC_HDR)); /* ProcGlobal */
1350         size += maxBackends * MAXALIGN(sizeof(PGPROC));         /* each MyProc */
1351         size += MAX_LOCK_METHODS * MAXALIGN(sizeof(LOCKMETHODTABLE));           /* each lockMethodTable */
1352
1353         /* lockHash table */
1354         size += hash_estimate_size(max_table_size, sizeof(LOCK));
1355
1356         /* proclockHash table */
1357         size += hash_estimate_size(max_table_size, sizeof(PROCLOCK));
1358
1359         /*
1360          * Since the lockHash entry count above is only an estimate, add 10%
1361          * safety margin.
1362          */
1363         size += size / 10;
1364
1365         return size;
1366 }
1367
1368 /*
1369  * GetLockStatusData - Return a summary of the lock manager's internal
1370  * status, for use in a user-level reporting function.
1371  *
1372  * The return data consists of an array of PROCLOCK objects, with the
1373  * associated PGPROC and LOCK objects for each.  Note that multiple
1374  * copies of the same PGPROC and/or LOCK objects are likely to appear.
1375  * It is the caller's responsibility to match up duplicates if wanted.
1376  *
1377  * The design goal is to hold the LockMgrLock for as short a time as possible;
1378  * thus, this function simply makes a copy of the necessary data and releases
1379  * the lock, allowing the caller to contemplate and format the data for as
1380  * long as it pleases.
1381  */
1382 LockData *
1383 GetLockStatusData(void)
1384 {
1385         LockData   *data;
1386         HTAB       *proclockTable;
1387         PROCLOCK   *proclock;
1388         HASH_SEQ_STATUS seqstat;
1389         int                     i;
1390
1391         data = (LockData *) palloc(sizeof(LockData));
1392
1393         LWLockAcquire(LockMgrLock, LW_EXCLUSIVE);
1394
1395         proclockTable = LockMethodTable[DEFAULT_LOCKMETHOD]->proclockHash;
1396
1397         data->nelements = i = proclockTable->hctl->nentries;
1398
1399         if (i == 0)
1400                 i = 1;                                  /* avoid palloc(0) if empty table */
1401
1402         data->proclockaddrs = (SHMEM_OFFSET *) palloc(sizeof(SHMEM_OFFSET) * i);
1403         data->proclocks = (PROCLOCK *) palloc(sizeof(PROCLOCK) * i);
1404         data->procs = (PGPROC *) palloc(sizeof(PGPROC) * i);
1405         data->locks = (LOCK *) palloc(sizeof(LOCK) * i);
1406
1407         hash_seq_init(&seqstat, proclockTable);
1408
1409         i = 0;
1410         while ((proclock = hash_seq_search(&seqstat)))
1411         {
1412                 PGPROC     *proc = (PGPROC *) MAKE_PTR(proclock->tag.proc);
1413                 LOCK       *lock = (LOCK *) MAKE_PTR(proclock->tag.lock);
1414
1415                 data->proclockaddrs[i] = MAKE_OFFSET(proclock);
1416                 memcpy(&(data->proclocks[i]), proclock, sizeof(PROCLOCK));
1417                 memcpy(&(data->procs[i]), proc, sizeof(PGPROC));
1418                 memcpy(&(data->locks[i]), lock, sizeof(LOCK));
1419
1420                 i++;
1421         }
1422
1423         LWLockRelease(LockMgrLock);
1424
1425         Assert(i == data->nelements);
1426
1427         return data;
1428 }
1429
1430 /* Provide the textual name of any lock mode */
1431 const char *
1432 GetLockmodeName(LOCKMODE mode)
1433 {
1434         Assert(mode <= MAX_LOCKMODES);
1435         return lock_mode_names[mode];
1436 }
1437
1438 #ifdef LOCK_DEBUG
1439 /*
1440  * Dump all locks in the proc->procHolders list.
1441  *
1442  * Must have already acquired the masterLock.
1443  */
1444 void
1445 DumpLocks(void)
1446 {
1447         PGPROC     *proc;
1448         SHM_QUEUE  *procHolders;
1449         PROCLOCK   *proclock;
1450         LOCK       *lock;
1451         int                     lockmethod = DEFAULT_LOCKMETHOD;
1452         LOCKMETHODTABLE *lockMethodTable;
1453
1454         proc = MyProc;
1455         if (proc == NULL)
1456                 return;
1457
1458         procHolders = &proc->procHolders;
1459
1460         Assert(lockmethod < NumLockMethods);
1461         lockMethodTable = LockMethodTable[lockmethod];
1462         if (!lockMethodTable)
1463                 return;
1464
1465         if (proc->waitLock)
1466                 LOCK_PRINT("DumpLocks: waiting on", proc->waitLock, 0);
1467
1468         proclock = (PROCLOCK *) SHMQueueNext(procHolders, procHolders,
1469                                                                            offsetof(PROCLOCK, procLink));
1470
1471         while (proclock)
1472         {
1473                 Assert(proclock->tag.proc == MAKE_OFFSET(proc));
1474
1475                 lock = (LOCK *) MAKE_PTR(proclock->tag.lock);
1476
1477                 PROCLOCK_PRINT("DumpLocks", proclock);
1478                 LOCK_PRINT("DumpLocks", lock, 0);
1479
1480                 proclock = (PROCLOCK *) SHMQueueNext(procHolders, &proclock->procLink,
1481                                                                                    offsetof(PROCLOCK, procLink));
1482         }
1483 }
1484
1485 /*
1486  * Dump all postgres locks. Must have already acquired the masterLock.
1487  */
1488 void
1489 DumpAllLocks(void)
1490 {
1491         PGPROC     *proc;
1492         PROCLOCK   *proclock;
1493         LOCK       *lock;
1494         int                     lockmethod = DEFAULT_LOCKMETHOD;
1495         LOCKMETHODTABLE *lockMethodTable;
1496         HTAB       *proclockTable;
1497         HASH_SEQ_STATUS status;
1498
1499         proc = MyProc;
1500         if (proc == NULL)
1501                 return;
1502
1503         Assert(lockmethod < NumLockMethods);
1504         lockMethodTable = LockMethodTable[lockmethod];
1505         if (!lockMethodTable)
1506                 return;
1507
1508         proclockTable = lockMethodTable->proclockHash;
1509
1510         if (proc->waitLock)
1511                 LOCK_PRINT("DumpAllLocks: waiting on", proc->waitLock, 0);
1512
1513         hash_seq_init(&status, proclockTable);
1514         while ((proclock = (PROCLOCK *) hash_seq_search(&status)) != NULL)
1515         {
1516                 PROCLOCK_PRINT("DumpAllLocks", proclock);
1517
1518                 if (proclock->tag.lock)
1519                 {
1520                         lock = (LOCK *) MAKE_PTR(proclock->tag.lock);
1521                         LOCK_PRINT("DumpAllLocks", lock, 0);
1522                 }
1523                 else
1524                         elog(LOG, "DumpAllLocks: proclock->tag.lock = NULL");
1525         }
1526 }
1527
1528 #endif   /* LOCK_DEBUG */