]> granicus.if.org Git - postgresql/blob - src/backend/storage/lmgr/lock.c
Fix some bogus comments.
[postgresql] / src / backend / storage / lmgr / lock.c
1 /*-------------------------------------------------------------------------
2  *
3  * lock.c
4  *        POSTGRES low-level lock mechanism
5  *
6  * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.118 2002/11/01 00:40:23 tgl Exp $
12  *
13  * NOTES
14  *        Outside modules can create a lock table and acquire/release
15  *        locks.  A lock table is a shared memory hash table.  When
16  *        a process tries to acquire a lock of a type that conflicts
17  *        with existing locks, it is put to sleep using the routines
18  *        in storage/lmgr/proc.c.
19  *
20  *        For the most part, this code should be invoked via lmgr.c
21  *        or another lock-management module, not directly.
22  *
23  *      Interface:
24  *
25  *      LockAcquire(), LockRelease(), LockMethodTableInit(),
26  *      LockMethodTableRename(), LockReleaseAll,
27  *      LockCheckConflicts(), GrantLock()
28  *
29  *-------------------------------------------------------------------------
30  */
31 #include "postgres.h"
32
33 #include <unistd.h>
34 #include <signal.h>
35
36 #include "access/xact.h"
37 #include "miscadmin.h"
38 #include "storage/proc.h"
39 #include "utils/memutils.h"
40 #include "utils/ps_status.h"
41
42
43 /* This configuration variable is used to set the lock table size */
44 int                     max_locks_per_xact; /* set by guc.c */
45
46 #define NLOCKENTS(maxBackends)  (max_locks_per_xact * (maxBackends))
47
48
49 static int WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode,
50                    LOCK *lock, PROCLOCK *holder);
51 static void LockCountMyLocks(SHMEM_OFFSET lockOffset, PGPROC *proc,
52                                  int *myHolding);
53
54 static char *lock_mode_names[] =
55 {
56         "INVALID",
57         "AccessShareLock",
58         "RowShareLock",
59         "RowExclusiveLock",
60         "ShareUpdateExclusiveLock",
61         "ShareLock",
62         "ShareRowExclusiveLock",
63         "ExclusiveLock",
64         "AccessExclusiveLock"
65 };
66
67
68 #ifdef LOCK_DEBUG
69
70 /*------
71  * The following configuration options are available for lock debugging:
72  *
73  *         TRACE_LOCKS          -- give a bunch of output what's going on in this file
74  *         TRACE_USERLOCKS      -- same but for user locks
75  *         TRACE_LOCK_OIDMIN-- do not trace locks for tables below this oid
76  *                                                 (use to avoid output on system tables)
77  *         TRACE_LOCK_TABLE -- trace locks on this table (oid) unconditionally
78  *         DEBUG_DEADLOCKS      -- currently dumps locks at untimely occasions ;)
79  *
80  * Furthermore, but in storage/lmgr/lwlock.c:
81  *         TRACE_LWLOCKS        -- trace lightweight locks (pretty useless)
82  *
83  * Define LOCK_DEBUG at compile time to get all these enabled.
84  * --------
85  */
86
87 int                     Trace_lock_oidmin = BootstrapObjectIdData;
88 bool            Trace_locks = false;
89 bool            Trace_userlocks = false;
90 int                     Trace_lock_table = 0;
91 bool            Debug_deadlocks = false;
92
93
94 inline static bool
95 LOCK_DEBUG_ENABLED(const LOCK *lock)
96 {
97         return
98                 (((LOCK_LOCKMETHOD(*lock) == DEFAULT_LOCKMETHOD && Trace_locks)
99            || (LOCK_LOCKMETHOD(*lock) == USER_LOCKMETHOD && Trace_userlocks))
100                  && (lock->tag.relId >= (Oid) Trace_lock_oidmin))
101                 || (Trace_lock_table && (lock->tag.relId == Trace_lock_table));
102 }
103
104
105 inline static void
106 LOCK_PRINT(const char *where, const LOCK *lock, LOCKMODE type)
107 {
108         if (LOCK_DEBUG_ENABLED(lock))
109                 elog(LOG,
110                          "%s: lock(%lx) tbl(%d) rel(%u) db(%u) obj(%u) grantMask(%x) "
111                          "req(%d,%d,%d,%d,%d,%d,%d)=%d "
112                          "grant(%d,%d,%d,%d,%d,%d,%d)=%d wait(%d) type(%s)",
113                          where, MAKE_OFFSET(lock),
114                          lock->tag.lockmethod, lock->tag.relId, lock->tag.dbId,
115                          lock->tag.objId.blkno, lock->grantMask,
116                          lock->requested[1], lock->requested[2], lock->requested[3],
117                          lock->requested[4], lock->requested[5], lock->requested[6],
118                          lock->requested[7], lock->nRequested,
119                          lock->granted[1], lock->granted[2], lock->granted[3],
120                          lock->granted[4], lock->granted[5], lock->granted[6],
121                          lock->granted[7], lock->nGranted,
122                          lock->waitProcs.size, lock_mode_names[type]);
123 }
124
125
126 inline static void
127 PROCLOCK_PRINT(const char *where, const PROCLOCK *holderP)
128 {
129         if (
130         (((PROCLOCK_LOCKMETHOD(*holderP) == DEFAULT_LOCKMETHOD && Trace_locks)
131           || (PROCLOCK_LOCKMETHOD(*holderP) == USER_LOCKMETHOD && Trace_userlocks))
132          && (((LOCK *) MAKE_PTR(holderP->tag.lock))->tag.relId >= (Oid) Trace_lock_oidmin))
133                 || (Trace_lock_table && (((LOCK *) MAKE_PTR(holderP->tag.lock))->tag.relId == Trace_lock_table))
134                 )
135                 elog(LOG,
136                          "%s: holder(%lx) lock(%lx) tbl(%d) proc(%lx) xid(%u) hold(%d,%d,%d,%d,%d,%d,%d)=%d",
137                          where, MAKE_OFFSET(holderP), holderP->tag.lock,
138                          PROCLOCK_LOCKMETHOD(*(holderP)),
139                          holderP->tag.proc, holderP->tag.xid,
140                    holderP->holding[1], holderP->holding[2], holderP->holding[3],
141                    holderP->holding[4], holderP->holding[5], holderP->holding[6],
142                          holderP->holding[7], holderP->nHolding);
143 }
144
145 #else                                                   /* not LOCK_DEBUG */
146
147 #define LOCK_PRINT(where, lock, type)
148 #define PROCLOCK_PRINT(where, holderP)
149 #endif   /* not LOCK_DEBUG */
150
151
152 /*
153  * These are to simplify/speed up some bit arithmetic.
154  *
155  * XXX is a fetch from a static array really faster than a shift?
156  * Wouldn't bet on it...
157  */
158
159 static LOCKMASK BITS_OFF[MAX_LOCKMODES];
160 static LOCKMASK BITS_ON[MAX_LOCKMODES];
161
162 /*
163  * map from lockmethod to the lock table structure
164  */
165 static LOCKMETHODTABLE *LockMethodTable[MAX_LOCK_METHODS];
166
167 static int      NumLockMethods;
168
169 /*
170  * InitLocks -- Init the lock module.  Create a private data
171  *              structure for constructing conflict masks.
172  */
173 void
174 InitLocks(void)
175 {
176         int                     i;
177         int                     bit;
178
179         bit = 1;
180         for (i = 0; i < MAX_LOCKMODES; i++, bit <<= 1)
181         {
182                 BITS_ON[i] = bit;
183                 BITS_OFF[i] = ~bit;
184         }
185 }
186
187
188 /*
189  * Fetch the lock method table associated with a given lock
190  */
191 LOCKMETHODTABLE *
192 GetLocksMethodTable(LOCK *lock)
193 {
194         LOCKMETHOD      lockmethod = LOCK_LOCKMETHOD(*lock);
195
196         Assert(lockmethod > 0 && lockmethod < NumLockMethods);
197         return LockMethodTable[lockmethod];
198 }
199
200
201 /*
202  * LockMethodInit -- initialize the lock table's lock type
203  *              structures
204  *
205  * Notes: just copying.  Should only be called once.
206  */
207 static void
208 LockMethodInit(LOCKMETHODTABLE *lockMethodTable,
209                            LOCKMASK *conflictsP,
210                            int numModes)
211 {
212         int                     i;
213
214         lockMethodTable->numLockModes = numModes;
215         numModes++;
216         for (i = 0; i < numModes; i++, conflictsP++)
217                 lockMethodTable->conflictTab[i] = *conflictsP;
218 }
219
220 /*
221  * LockMethodTableInit -- initialize a lock table structure
222  *
223  * NOTE: data structures allocated here are allocated permanently, using
224  * TopMemoryContext and shared memory.  We don't ever release them anyway,
225  * and in normal multi-backend operation the lock table structures set up
226  * by the postmaster are inherited by each backend, so they must be in
227  * TopMemoryContext.
228  */
229 LOCKMETHOD
230 LockMethodTableInit(char *tabName,
231                                         LOCKMASK *conflictsP,
232                                         int numModes,
233                                         int maxBackends)
234 {
235         LOCKMETHODTABLE *lockMethodTable;
236         char       *shmemName;
237         HASHCTL         info;
238         int                     hash_flags;
239         bool            found;
240         long            init_table_size,
241                                 max_table_size;
242
243         if (numModes >= MAX_LOCKMODES)
244         {
245                 elog(WARNING, "LockMethodTableInit: too many lock types %d greater than %d",
246                          numModes, MAX_LOCKMODES);
247                 return INVALID_LOCKMETHOD;
248         }
249
250         /* Compute init/max size to request for lock hashtables */
251         max_table_size = NLOCKENTS(maxBackends);
252         init_table_size = max_table_size / 10;
253
254         /* Allocate a string for the shmem index table lookups. */
255         /* This is just temp space in this routine, so palloc is OK. */
256         shmemName = (char *) palloc(strlen(tabName) + 32);
257
258         /* each lock table has a header in shared memory */
259         sprintf(shmemName, "%s (lock method table)", tabName);
260         lockMethodTable = (LOCKMETHODTABLE *)
261                 ShmemInitStruct(shmemName, sizeof(LOCKMETHODTABLE), &found);
262
263         if (!lockMethodTable)
264                 elog(FATAL, "LockMethodTableInit: couldn't initialize %s", tabName);
265
266         /*
267          * Lock the LWLock for the table (probably not necessary here)
268          */
269         LWLockAcquire(LockMgrLock, LW_EXCLUSIVE);
270
271         /*
272          * no zero-th table
273          */
274         NumLockMethods = 1;
275
276         /*
277          * we're first - initialize
278          */
279         if (!found)
280         {
281                 MemSet(lockMethodTable, 0, sizeof(LOCKMETHODTABLE));
282                 lockMethodTable->masterLock = LockMgrLock;
283                 lockMethodTable->lockmethod = NumLockMethods;
284         }
285
286         /*
287          * other modules refer to the lock table by a lockmethod ID
288          */
289         LockMethodTable[NumLockMethods] = lockMethodTable;
290         NumLockMethods++;
291         Assert(NumLockMethods <= MAX_LOCK_METHODS);
292
293         /*
294          * allocate a hash table for LOCK structs.      This is used to store
295          * per-locked-object information.
296          */
297         info.keysize = sizeof(LOCKTAG);
298         info.entrysize = sizeof(LOCK);
299         info.hash = tag_hash;
300         hash_flags = (HASH_ELEM | HASH_FUNCTION);
301
302         sprintf(shmemName, "%s (lock hash)", tabName);
303         lockMethodTable->lockHash = ShmemInitHash(shmemName,
304                                                                                           init_table_size,
305                                                                                           max_table_size,
306                                                                                           &info,
307                                                                                           hash_flags);
308
309         if (!lockMethodTable->lockHash)
310                 elog(FATAL, "LockMethodTableInit: couldn't initialize %s", tabName);
311         Assert(lockMethodTable->lockHash->hash == tag_hash);
312
313         /*
314          * allocate a hash table for PROCLOCK structs.  This is used to store
315          * per-lock-holder information.
316          */
317         info.keysize = sizeof(PROCLOCKTAG);
318         info.entrysize = sizeof(PROCLOCK);
319         info.hash = tag_hash;
320         hash_flags = (HASH_ELEM | HASH_FUNCTION);
321
322         sprintf(shmemName, "%s (holder hash)", tabName);
323         lockMethodTable->holderHash = ShmemInitHash(shmemName,
324                                                                                                 init_table_size,
325                                                                                                 max_table_size,
326                                                                                                 &info,
327                                                                                                 hash_flags);
328
329         if (!lockMethodTable->holderHash)
330                 elog(FATAL, "LockMethodTableInit: couldn't initialize %s", tabName);
331
332         /* init data structures */
333         LockMethodInit(lockMethodTable, conflictsP, numModes);
334
335         LWLockRelease(LockMgrLock);
336
337         pfree(shmemName);
338
339         return lockMethodTable->lockmethod;
340 }
341
342 /*
343  * LockMethodTableRename -- allocate another lockmethod ID to the same
344  *              lock table.
345  *
346  * NOTES: Both the lock module and the lock chain (lchain.c)
347  *              module use table id's to distinguish between different
348  *              kinds of locks.  Short term and long term locks look
349  *              the same to the lock table, but are handled differently
350  *              by the lock chain manager.      This function allows the
351  *              client to use different lockmethods when acquiring/releasing
352  *              short term and long term locks, yet store them all in one hashtable.
353  */
354
355 LOCKMETHOD
356 LockMethodTableRename(LOCKMETHOD lockmethod)
357 {
358         LOCKMETHOD      newLockMethod;
359
360         if (NumLockMethods >= MAX_LOCK_METHODS)
361                 return INVALID_LOCKMETHOD;
362         if (LockMethodTable[lockmethod] == INVALID_LOCKMETHOD)
363                 return INVALID_LOCKMETHOD;
364
365         /* other modules refer to the lock table by a lockmethod ID */
366         newLockMethod = NumLockMethods;
367         NumLockMethods++;
368
369         LockMethodTable[newLockMethod] = LockMethodTable[lockmethod];
370         return newLockMethod;
371 }
372
373 /*
374  * LockAcquire -- Check for lock conflicts, sleep if conflict found,
375  *              set lock if/when no conflicts.
376  *
377  * Returns: TRUE if lock was acquired, FALSE otherwise.  Note that
378  *              a FALSE return is to be expected if dontWait is TRUE;
379  *              but if dontWait is FALSE, only a parameter error can cause
380  *              a FALSE return.  (XXX probably we should just elog on parameter
381  *              errors, instead of conflating this with failure to acquire lock?)
382  *
383  * Side Effects: The lock is acquired and recorded in lock tables.
384  *
385  * NOTE: if we wait for the lock, there is no way to abort the wait
386  * short of aborting the transaction.
387  *
388  *
389  * Note on User Locks:
390  *
391  *              User locks are handled totally on the application side as
392  *              long term cooperative locks which extend beyond the normal
393  *              transaction boundaries.  Their purpose is to indicate to an
394  *              application that someone is `working' on an item.  So it is
395  *              possible to put an user lock on a tuple's oid, retrieve the
396  *              tuple, work on it for an hour and then update it and remove
397  *              the lock.  While the lock is active other clients can still
398  *              read and write the tuple but they can be aware that it has
399  *              been locked at the application level by someone.
400  *              User locks use lock tags made of an uint16 and an uint32, for
401  *              example 0 and a tuple oid, or any other arbitrary pair of
402  *              numbers following a convention established by the application.
403  *              In this sense tags don't refer to tuples or database entities.
404  *              User locks and normal locks are completely orthogonal and
405  *              they don't interfere with each other, so it is possible
406  *              to acquire a normal lock on an user-locked tuple or user-lock
407  *              a tuple for which a normal write lock already exists.
408  *              User locks are always non blocking, therefore they are never
409  *              acquired if already held by another process.  They must be
410  *              released explicitly by the application but they are released
411  *              automatically when a backend terminates.
412  *              They are indicated by a lockmethod 2 which is an alias for the
413  *              normal lock table, and are distinguished from normal locks
414  *              by the following differences:
415  *
416  *                                                                              normal lock             user lock
417  *
418  *              lockmethod                                              1                               2
419  *              tag.dbId                                                database oid    database oid
420  *              tag.relId                                               rel oid or 0    0
421  *              tag.objId                                               block id                lock id2
422  *                                                                              or xact id
423  *              tag.offnum                                              0                               lock id1
424  *              holder.xid                                              xid or 0                0
425  *              persistence                                             transaction             user or backend
426  *                                                                              or backend
427  *
428  *              The lockmode parameter can have the same values for normal locks
429  *              although probably only WRITE_LOCK can have some practical use.
430  *
431  *                                                                                                              DZ - 22 Nov 1997
432  */
433
434 bool
435 LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
436                         TransactionId xid, LOCKMODE lockmode, bool dontWait)
437 {
438         PROCLOCK   *holder;
439         PROCLOCKTAG holdertag;
440         HTAB       *holderTable;
441         bool            found;
442         LOCK       *lock;
443         LWLockId        masterLock;
444         LOCKMETHODTABLE *lockMethodTable;
445         int                     status;
446         int                     myHolding[MAX_LOCKMODES];
447         int                     i;
448
449 #ifdef LOCK_DEBUG
450         if (lockmethod == USER_LOCKMETHOD && Trace_userlocks)
451                 elog(LOG, "LockAcquire: user lock [%u] %s",
452                          locktag->objId.blkno, lock_mode_names[lockmode]);
453 #endif
454
455         /* ???????? This must be changed when short term locks will be used */
456         locktag->lockmethod = lockmethod;
457
458         Assert(lockmethod < NumLockMethods);
459         lockMethodTable = LockMethodTable[lockmethod];
460         if (!lockMethodTable)
461         {
462                 elog(WARNING, "LockAcquire: bad lock table %d", lockmethod);
463                 return FALSE;
464         }
465
466         masterLock = lockMethodTable->masterLock;
467
468         LWLockAcquire(masterLock, LW_EXCLUSIVE);
469
470         /*
471          * Find or create a lock with this tag
472          */
473         Assert(lockMethodTable->lockHash->hash == tag_hash);
474         lock = (LOCK *) hash_search(lockMethodTable->lockHash,
475                                                                 (void *) locktag,
476                                                                 HASH_ENTER, &found);
477         if (!lock)
478         {
479                 LWLockRelease(masterLock);
480                 elog(ERROR, "LockAcquire: lock table %d is out of memory",
481                          lockmethod);
482                 return FALSE;
483         }
484
485         /*
486          * if it's a new lock object, initialize it
487          */
488         if (!found)
489         {
490                 lock->grantMask = 0;
491                 lock->waitMask = 0;
492                 SHMQueueInit(&(lock->lockHolders));
493                 ProcQueueInit(&(lock->waitProcs));
494                 lock->nRequested = 0;
495                 lock->nGranted = 0;
496                 MemSet((char *) lock->requested, 0, sizeof(int) * MAX_LOCKMODES);
497                 MemSet((char *) lock->granted, 0, sizeof(int) * MAX_LOCKMODES);
498                 LOCK_PRINT("LockAcquire: new", lock, lockmode);
499         }
500         else
501         {
502                 LOCK_PRINT("LockAcquire: found", lock, lockmode);
503                 Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
504                 Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
505                 Assert(lock->nGranted <= lock->nRequested);
506         }
507
508         /*
509          * Create the hash key for the holder table.
510          */
511         MemSet(&holdertag, 0, sizeof(PROCLOCKTAG)); /* must clear padding,
512                                                                                                  * needed */
513         holdertag.lock = MAKE_OFFSET(lock);
514         holdertag.proc = MAKE_OFFSET(MyProc);
515         TransactionIdStore(xid, &holdertag.xid);
516
517         /*
518          * Find or create a holder entry with this tag
519          */
520         holderTable = lockMethodTable->holderHash;
521         holder = (PROCLOCK *) hash_search(holderTable,
522                                                                           (void *) &holdertag,
523                                                                           HASH_ENTER, &found);
524         if (!holder)
525         {
526                 LWLockRelease(masterLock);
527                 elog(ERROR, "LockAcquire: holder table out of memory");
528                 return FALSE;
529         }
530
531         /*
532          * If new, initialize the new entry
533          */
534         if (!found)
535         {
536                 holder->nHolding = 0;
537                 MemSet((char *) holder->holding, 0, sizeof(int) * MAX_LOCKMODES);
538                 /* Add holder to appropriate lists */
539                 SHMQueueInsertBefore(&lock->lockHolders, &holder->lockLink);
540                 SHMQueueInsertBefore(&MyProc->procHolders, &holder->procLink);
541                 PROCLOCK_PRINT("LockAcquire: new", holder);
542         }
543         else
544         {
545                 PROCLOCK_PRINT("LockAcquire: found", holder);
546                 Assert((holder->nHolding >= 0) && (holder->holding[lockmode] >= 0));
547                 Assert(holder->nHolding <= lock->nGranted);
548
549 #ifdef CHECK_DEADLOCK_RISK
550
551                 /*
552                  * Issue warning if we already hold a lower-level lock on this
553                  * object and do not hold a lock of the requested level or higher.
554                  * This indicates a deadlock-prone coding practice (eg, we'd have
555                  * a deadlock if another backend were following the same code path
556                  * at about the same time).
557                  *
558                  * This is not enabled by default, because it may generate log
559                  * entries about user-level coding practices that are in fact safe
560                  * in context. It can be enabled to help find system-level
561                  * problems.
562                  *
563                  * XXX Doing numeric comparison on the lockmodes is a hack; it'd be
564                  * better to use a table.  For now, though, this works.
565                  */
566                 for (i = lockMethodTable->numLockModes; i > 0; i--)
567                 {
568                         if (holder->holding[i] > 0)
569                         {
570                                 if (i >= (int) lockmode)
571                                         break;          /* safe: we have a lock >= req level */
572                                 elog(LOG, "Deadlock risk: raising lock level"
573                                          " from %s to %s on object %u/%u/%u",
574                                          lock_mode_names[i], lock_mode_names[lockmode],
575                                  lock->tag.relId, lock->tag.dbId, lock->tag.objId.blkno);
576                                 break;
577                         }
578                 }
579 #endif   /* CHECK_DEADLOCK_RISK */
580         }
581
582         /*
583          * lock->nRequested and lock->requested[] count the total number of
584          * requests, whether granted or waiting, so increment those
585          * immediately. The other counts don't increment till we get the lock.
586          */
587         lock->nRequested++;
588         lock->requested[lockmode]++;
589         Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
590
591         /*
592          * If I already hold one or more locks of the requested type, just
593          * grant myself another one without blocking.
594          */
595         if (holder->holding[lockmode] > 0)
596         {
597                 GrantLock(lock, holder, lockmode);
598                 PROCLOCK_PRINT("LockAcquire: owning", holder);
599                 LWLockRelease(masterLock);
600                 return TRUE;
601         }
602
603         /*
604          * If this process (under any XID) is a holder of the lock, also grant
605          * myself another one without blocking.
606          */
607         LockCountMyLocks(holder->tag.lock, MyProc, myHolding);
608         if (myHolding[lockmode] > 0)
609         {
610                 GrantLock(lock, holder, lockmode);
611                 PROCLOCK_PRINT("LockAcquire: my other XID owning", holder);
612                 LWLockRelease(masterLock);
613                 return TRUE;
614         }
615
616         /*
617          * If lock requested conflicts with locks requested by waiters, must
618          * join wait queue.  Otherwise, check for conflict with already-held
619          * locks.  (That's last because most complex check.)
620          */
621         if (lockMethodTable->conflictTab[lockmode] & lock->waitMask)
622                 status = STATUS_FOUND;
623         else
624                 status = LockCheckConflicts(lockMethodTable, lockmode,
625                                                                         lock, holder,
626                                                                         MyProc, myHolding);
627
628         if (status == STATUS_OK)
629         {
630                 /* No conflict with held or previously requested locks */
631                 GrantLock(lock, holder, lockmode);
632         }
633         else
634         {
635                 Assert(status == STATUS_FOUND);
636
637                 /*
638                  * We can't acquire the lock immediately.  If caller specified no
639                  * blocking, remove the holder entry and return FALSE without
640                  * waiting.
641                  */
642                 if (dontWait)
643                 {
644                         if (holder->nHolding == 0)
645                         {
646                                 SHMQueueDelete(&holder->lockLink);
647                                 SHMQueueDelete(&holder->procLink);
648                                 holder = (PROCLOCK *) hash_search(holderTable,
649                                                                                                   (void *) holder,
650                                                                                                   HASH_REMOVE, NULL);
651                                 if (!holder)
652                                         elog(WARNING, "LockAcquire: remove holder, table corrupted");
653                         }
654                         else
655                                 PROCLOCK_PRINT("LockAcquire: NHOLDING", holder);
656                         lock->nRequested--;
657                         lock->requested[lockmode]--;
658                         LOCK_PRINT("LockAcquire: conditional lock failed", lock, lockmode);
659                         Assert((lock->nRequested > 0) && (lock->requested[lockmode] >= 0));
660                         Assert(lock->nGranted <= lock->nRequested);
661                         LWLockRelease(masterLock);
662                         return FALSE;
663                 }
664
665                 /*
666                  * Construct bitmask of locks this process holds on this object.
667                  */
668                 {
669                         int                     heldLocks = 0;
670                         int                     tmpMask;
671
672                         for (i = 1, tmpMask = 2;
673                                  i <= lockMethodTable->numLockModes;
674                                  i++, tmpMask <<= 1)
675                         {
676                                 if (myHolding[i] > 0)
677                                         heldLocks |= tmpMask;
678                         }
679                         MyProc->heldLocks = heldLocks;
680                 }
681
682                 /*
683                  * Sleep till someone wakes me up.
684                  */
685                 status = WaitOnLock(lockmethod, lockmode, lock, holder);
686
687                 /*
688                  * NOTE: do not do any material change of state between here and
689                  * return.      All required changes in locktable state must have been
690                  * done when the lock was granted to us --- see notes in
691                  * WaitOnLock.
692                  */
693
694                 /*
695                  * Check the holder entry status, in case something in the ipc
696                  * communication doesn't work correctly.
697                  */
698                 if (!((holder->nHolding > 0) && (holder->holding[lockmode] > 0)))
699                 {
700                         PROCLOCK_PRINT("LockAcquire: INCONSISTENT", holder);
701                         LOCK_PRINT("LockAcquire: INCONSISTENT", lock, lockmode);
702                         /* Should we retry ? */
703                         LWLockRelease(masterLock);
704                         return FALSE;
705                 }
706                 PROCLOCK_PRINT("LockAcquire: granted", holder);
707                 LOCK_PRINT("LockAcquire: granted", lock, lockmode);
708         }
709
710         LWLockRelease(masterLock);
711
712         return status == STATUS_OK;
713 }
714
715 /*
716  * LockCheckConflicts -- test whether requested lock conflicts
717  *              with those already granted
718  *
719  * Returns STATUS_FOUND if conflict, STATUS_OK if no conflict.
720  *
721  * NOTES:
722  *              Here's what makes this complicated: one process's locks don't
723  * conflict with one another, even if they are held under different
724  * transaction IDs (eg, session and xact locks do not conflict).
725  * So, we must subtract off our own locks when determining whether the
726  * requested new lock conflicts with those already held.
727  *
728  * The caller can optionally pass the process's total holding counts, if
729  * known.  If NULL is passed then these values will be computed internally.
730  */
731 int
732 LockCheckConflicts(LOCKMETHODTABLE *lockMethodTable,
733                                    LOCKMODE lockmode,
734                                    LOCK *lock,
735                                    PROCLOCK *holder,
736                                    PGPROC *proc,
737                                    int *myHolding)              /* myHolding[] array or NULL */
738 {
739         int                     numLockModes = lockMethodTable->numLockModes;
740         int                     bitmask;
741         int                     i,
742                                 tmpMask;
743         int                     localHolding[MAX_LOCKMODES];
744
745         /*
746          * first check for global conflicts: If no locks conflict with my
747          * request, then I get the lock.
748          *
749          * Checking for conflict: lock->grantMask represents the types of
750          * currently held locks.  conflictTable[lockmode] has a bit set for
751          * each type of lock that conflicts with request.       Bitwise compare
752          * tells if there is a conflict.
753          */
754         if (!(lockMethodTable->conflictTab[lockmode] & lock->grantMask))
755         {
756                 PROCLOCK_PRINT("LockCheckConflicts: no conflict", holder);
757                 return STATUS_OK;
758         }
759
760         /*
761          * Rats.  Something conflicts. But it could still be my own lock.  We
762          * have to construct a conflict mask that does not reflect our own
763          * locks.  Locks held by the current process under another XID also
764          * count as "our own locks".
765          */
766         if (myHolding == NULL)
767         {
768                 /* Caller didn't do calculation of total holding for me */
769                 LockCountMyLocks(holder->tag.lock, proc, localHolding);
770                 myHolding = localHolding;
771         }
772
773         /* Compute mask of lock types held by other processes */
774         bitmask = 0;
775         tmpMask = 2;
776         for (i = 1; i <= numLockModes; i++, tmpMask <<= 1)
777         {
778                 if (lock->granted[i] != myHolding[i])
779                         bitmask |= tmpMask;
780         }
781
782         /*
783          * now check again for conflicts.  'bitmask' describes the types of
784          * locks held by other processes.  If one of these conflicts with the
785          * kind of lock that I want, there is a conflict and I have to sleep.
786          */
787         if (!(lockMethodTable->conflictTab[lockmode] & bitmask))
788         {
789                 /* no conflict. OK to get the lock */
790                 PROCLOCK_PRINT("LockCheckConflicts: resolved", holder);
791                 return STATUS_OK;
792         }
793
794         PROCLOCK_PRINT("LockCheckConflicts: conflicting", holder);
795         return STATUS_FOUND;
796 }
797
798 /*
799  * LockCountMyLocks --- Count total number of locks held on a given lockable
800  *              object by a given process (under any transaction ID).
801  *
802  * XXX This could be rather slow if the process holds a large number of locks.
803  * Perhaps it could be sped up if we kept yet a third hashtable of per-
804  * process lock information.  However, for the normal case where a transaction
805  * doesn't hold a large number of locks, keeping such a table would probably
806  * be a net slowdown.
807  */
808 static void
809 LockCountMyLocks(SHMEM_OFFSET lockOffset, PGPROC *proc, int *myHolding)
810 {
811         SHM_QUEUE  *procHolders = &(proc->procHolders);
812         PROCLOCK   *holder;
813         int                     i;
814
815         MemSet(myHolding, 0, MAX_LOCKMODES * sizeof(int));
816
817         holder = (PROCLOCK *) SHMQueueNext(procHolders, procHolders,
818                                                                            offsetof(PROCLOCK, procLink));
819
820         while (holder)
821         {
822                 if (lockOffset == holder->tag.lock)
823                 {
824                         for (i = 1; i < MAX_LOCKMODES; i++)
825                                 myHolding[i] += holder->holding[i];
826                 }
827
828                 holder = (PROCLOCK *) SHMQueueNext(procHolders, &holder->procLink,
829                                                                                    offsetof(PROCLOCK, procLink));
830         }
831 }
832
833 /*
834  * GrantLock -- update the lock and holder data structures to show
835  *              the lock request has been granted.
836  *
837  * NOTE: if proc was blocked, it also needs to be removed from the wait list
838  * and have its waitLock/waitHolder fields cleared.  That's not done here.
839  */
840 void
841 GrantLock(LOCK *lock, PROCLOCK *holder, LOCKMODE lockmode)
842 {
843         lock->nGranted++;
844         lock->granted[lockmode]++;
845         lock->grantMask |= BITS_ON[lockmode];
846         if (lock->granted[lockmode] == lock->requested[lockmode])
847                 lock->waitMask &= BITS_OFF[lockmode];
848         LOCK_PRINT("GrantLock", lock, lockmode);
849         Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
850         Assert(lock->nGranted <= lock->nRequested);
851         holder->holding[lockmode]++;
852         holder->nHolding++;
853         Assert((holder->nHolding > 0) && (holder->holding[lockmode] > 0));
854 }
855
856 /*
857  * WaitOnLock -- wait to acquire a lock
858  *
859  * Caller must have set MyProc->heldLocks to reflect locks already held
860  * on the lockable object by this process (under all XIDs).
861  *
862  * The locktable's masterLock must be held at entry.
863  */
864 static int
865 WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode,
866                    LOCK *lock, PROCLOCK *holder)
867 {
868         LOCKMETHODTABLE *lockMethodTable = LockMethodTable[lockmethod];
869         char       *new_status,
870                            *old_status;
871
872         Assert(lockmethod < NumLockMethods);
873
874         LOCK_PRINT("WaitOnLock: sleeping on lock", lock, lockmode);
875
876         old_status = pstrdup(get_ps_display());
877         new_status = (char *) palloc(strlen(old_status) + 10);
878         strcpy(new_status, old_status);
879         strcat(new_status, " waiting");
880         set_ps_display(new_status);
881
882         /*
883          * NOTE: Think not to put any shared-state cleanup after the call to
884          * ProcSleep, in either the normal or failure path.  The lock state
885          * must be fully set by the lock grantor, or by CheckDeadLock if we
886          * give up waiting for the lock.  This is necessary because of the
887          * possibility that a cancel/die interrupt will interrupt ProcSleep
888          * after someone else grants us the lock, but before we've noticed it.
889          * Hence, after granting, the locktable state must fully reflect the
890          * fact that we own the lock; we can't do additional work on return.
891          * Contrariwise, if we fail, any cleanup must happen in xact abort
892          * processing, not here, to ensure it will also happen in the
893          * cancel/die case.
894          */
895
896         if (ProcSleep(lockMethodTable,
897                                   lockmode,
898                                   lock,
899                                   holder) != STATUS_OK)
900         {
901                 /*
902                  * We failed as a result of a deadlock, see CheckDeadLock(). Quit
903                  * now.  Removal of the holder and lock objects, if no longer
904                  * needed, will happen in xact cleanup (see above for motivation).
905                  */
906                 LOCK_PRINT("WaitOnLock: aborting on lock", lock, lockmode);
907                 LWLockRelease(lockMethodTable->masterLock);
908                 elog(ERROR, "deadlock detected");
909                 /* not reached */
910         }
911
912         set_ps_display(old_status);
913         pfree(old_status);
914         pfree(new_status);
915
916         LOCK_PRINT("WaitOnLock: wakeup on lock", lock, lockmode);
917         return STATUS_OK;
918 }
919
920 /*
921  * Remove a proc from the wait-queue it is on
922  * (caller must know it is on one).
923  *
924  * Locktable lock must be held by caller.
925  *
926  * NB: this does not remove the process' holder object, nor the lock object,
927  * even though their counts might now have gone to zero.  That will happen
928  * during a subsequent LockReleaseAll call, which we expect will happen
929  * during transaction cleanup.  (Removal of a proc from its wait queue by
930  * this routine can only happen if we are aborting the transaction.)
931  */
932 void
933 RemoveFromWaitQueue(PGPROC *proc)
934 {
935         LOCK       *waitLock = proc->waitLock;
936         LOCKMODE        lockmode = proc->waitLockMode;
937
938         /* Make sure proc is waiting */
939         Assert(proc->links.next != INVALID_OFFSET);
940         Assert(waitLock);
941         Assert(waitLock->waitProcs.size > 0);
942
943         /* Remove proc from lock's wait queue */
944         SHMQueueDelete(&(proc->links));
945         waitLock->waitProcs.size--;
946
947         /* Undo increments of request counts by waiting process */
948         Assert(waitLock->nRequested > 0);
949         Assert(waitLock->nRequested > proc->waitLock->nGranted);
950         waitLock->nRequested--;
951         Assert(waitLock->requested[lockmode] > 0);
952         waitLock->requested[lockmode]--;
953         /* don't forget to clear waitMask bit if appropriate */
954         if (waitLock->granted[lockmode] == waitLock->requested[lockmode])
955                 waitLock->waitMask &= BITS_OFF[lockmode];
956
957         /* Clean up the proc's own state */
958         proc->waitLock = NULL;
959         proc->waitHolder = NULL;
960
961         /* See if any other waiters for the lock can be woken up now */
962         ProcLockWakeup(GetLocksMethodTable(waitLock), waitLock);
963 }
964
965 /*
966  * LockRelease -- look up 'locktag' in lock table 'lockmethod' and
967  *              release one 'lockmode' lock on it.
968  *
969  * Side Effects: find any waiting processes that are now wakable,
970  *              grant them their requested locks and awaken them.
971  *              (We have to grant the lock here to avoid a race between
972  *              the waking process and any new process to
973  *              come along and request the lock.)
974  */
975 bool
976 LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag,
977                         TransactionId xid, LOCKMODE lockmode)
978 {
979         LOCK       *lock;
980         LWLockId        masterLock;
981         LOCKMETHODTABLE *lockMethodTable;
982         PROCLOCK   *holder;
983         PROCLOCKTAG holdertag;
984         HTAB       *holderTable;
985         bool            wakeupNeeded = false;
986
987 #ifdef LOCK_DEBUG
988         if (lockmethod == USER_LOCKMETHOD && Trace_userlocks)
989                 elog(LOG, "LockRelease: user lock tag [%u] %d", locktag->objId.blkno, lockmode);
990 #endif
991
992         /* ???????? This must be changed when short term locks will be used */
993         locktag->lockmethod = lockmethod;
994
995         Assert(lockmethod < NumLockMethods);
996         lockMethodTable = LockMethodTable[lockmethod];
997         if (!lockMethodTable)
998         {
999                 elog(WARNING, "lockMethodTable is null in LockRelease");
1000                 return FALSE;
1001         }
1002
1003         masterLock = lockMethodTable->masterLock;
1004         LWLockAcquire(masterLock, LW_EXCLUSIVE);
1005
1006         /*
1007          * Find a lock with this tag
1008          */
1009         Assert(lockMethodTable->lockHash->hash == tag_hash);
1010         lock = (LOCK *) hash_search(lockMethodTable->lockHash,
1011                                                                 (void *) locktag,
1012                                                                 HASH_FIND, NULL);
1013
1014         /*
1015          * let the caller print its own error message, too. Do not
1016          * elog(ERROR).
1017          */
1018         if (!lock)
1019         {
1020                 LWLockRelease(masterLock);
1021                 elog(WARNING, "LockRelease: no such lock");
1022                 return FALSE;
1023         }
1024         LOCK_PRINT("LockRelease: found", lock, lockmode);
1025
1026         /*
1027          * Find the holder entry for this holder.
1028          */
1029         MemSet(&holdertag, 0, sizeof(PROCLOCKTAG)); /* must clear padding,
1030                                                                                                  * needed */
1031         holdertag.lock = MAKE_OFFSET(lock);
1032         holdertag.proc = MAKE_OFFSET(MyProc);
1033         TransactionIdStore(xid, &holdertag.xid);
1034
1035         holderTable = lockMethodTable->holderHash;
1036         holder = (PROCLOCK *) hash_search(holderTable,
1037                                                                           (void *) &holdertag,
1038                                                                           HASH_FIND_SAVE, NULL);
1039         if (!holder)
1040         {
1041                 LWLockRelease(masterLock);
1042 #ifdef USER_LOCKS
1043                 if (lockmethod == USER_LOCKMETHOD)
1044                         elog(WARNING, "LockRelease: no lock with this tag");
1045                 else
1046 #endif
1047                         elog(WARNING, "LockRelease: holder table corrupted");
1048                 return FALSE;
1049         }
1050         PROCLOCK_PRINT("LockRelease: found", holder);
1051
1052         /*
1053          * Check that we are actually holding a lock of the type we want to
1054          * release.
1055          */
1056         if (!(holder->holding[lockmode] > 0))
1057         {
1058                 PROCLOCK_PRINT("LockRelease: WRONGTYPE", holder);
1059                 Assert(holder->holding[lockmode] >= 0);
1060                 LWLockRelease(masterLock);
1061                 elog(WARNING, "LockRelease: you don't own a lock of type %s",
1062                          lock_mode_names[lockmode]);
1063                 return FALSE;
1064         }
1065         Assert(holder->nHolding > 0);
1066         Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
1067         Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
1068         Assert(lock->nGranted <= lock->nRequested);
1069
1070         /*
1071          * fix the general lock stats
1072          */
1073         lock->nRequested--;
1074         lock->requested[lockmode]--;
1075         lock->nGranted--;
1076         lock->granted[lockmode]--;
1077
1078         if (lock->granted[lockmode] == 0)
1079         {
1080                 /* change the conflict mask.  No more of this lock type. */
1081                 lock->grantMask &= BITS_OFF[lockmode];
1082         }
1083
1084         LOCK_PRINT("LockRelease: updated", lock, lockmode);
1085         Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
1086         Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
1087         Assert(lock->nGranted <= lock->nRequested);
1088
1089         /*
1090          * We need only run ProcLockWakeup if the released lock conflicts with
1091          * at least one of the lock types requested by waiter(s).  Otherwise
1092          * whatever conflict made them wait must still exist.  NOTE: before
1093          * MVCC, we could skip wakeup if lock->granted[lockmode] was still
1094          * positive. But that's not true anymore, because the remaining
1095          * granted locks might belong to some waiter, who could now be
1096          * awakened because he doesn't conflict with his own locks.
1097          */
1098         if (lockMethodTable->conflictTab[lockmode] & lock->waitMask)
1099                 wakeupNeeded = true;
1100
1101         if (lock->nRequested == 0)
1102         {
1103                 /*
1104                  * if there's no one waiting in the queue, we just released the
1105                  * last lock on this object. Delete it from the lock table.
1106                  */
1107                 Assert(lockMethodTable->lockHash->hash == tag_hash);
1108                 lock = (LOCK *) hash_search(lockMethodTable->lockHash,
1109                                                                         (void *) &(lock->tag),
1110                                                                         HASH_REMOVE,
1111                                                                         NULL);
1112                 if (!lock)
1113                 {
1114                         LWLockRelease(masterLock);
1115                         elog(WARNING, "LockRelease: remove lock, table corrupted");
1116                         return FALSE;
1117                 }
1118                 wakeupNeeded = false;   /* should be false, but make sure */
1119         }
1120
1121         /*
1122          * Now fix the per-holder lock stats.
1123          */
1124         holder->holding[lockmode]--;
1125         holder->nHolding--;
1126         PROCLOCK_PRINT("LockRelease: updated", holder);
1127         Assert((holder->nHolding >= 0) && (holder->holding[lockmode] >= 0));
1128
1129         /*
1130          * If this was my last hold on this lock, delete my entry in the
1131          * holder table.
1132          */
1133         if (holder->nHolding == 0)
1134         {
1135                 PROCLOCK_PRINT("LockRelease: deleting", holder);
1136                 SHMQueueDelete(&holder->lockLink);
1137                 SHMQueueDelete(&holder->procLink);
1138                 holder = (PROCLOCK *) hash_search(holderTable,
1139                                                                                   (void *) &holder,
1140                                                                                   HASH_REMOVE_SAVED, NULL);
1141                 if (!holder)
1142                 {
1143                         LWLockRelease(masterLock);
1144                         elog(WARNING, "LockRelease: remove holder, table corrupted");
1145                         return FALSE;
1146                 }
1147         }
1148
1149         /*
1150          * Wake up waiters if needed.
1151          */
1152         if (wakeupNeeded)
1153                 ProcLockWakeup(lockMethodTable, lock);
1154
1155         LWLockRelease(masterLock);
1156         return TRUE;
1157 }
1158
1159 /*
1160  * LockReleaseAll -- Release all locks in a process's lock list.
1161  *
1162  * Well, not really *all* locks.
1163  *
1164  * If 'allxids' is TRUE, all locks of the specified lock method are
1165  * released, regardless of transaction affiliation.
1166  *
1167  * If 'allxids' is FALSE, all locks of the specified lock method and
1168  * specified XID are released.
1169  */
1170 bool
1171 LockReleaseAll(LOCKMETHOD lockmethod, PGPROC *proc,
1172                            bool allxids, TransactionId xid)
1173 {
1174         SHM_QUEUE  *procHolders = &(proc->procHolders);
1175         PROCLOCK   *holder;
1176         PROCLOCK   *nextHolder;
1177         LWLockId        masterLock;
1178         LOCKMETHODTABLE *lockMethodTable;
1179         int                     i,
1180                                 numLockModes;
1181         LOCK       *lock;
1182
1183 #ifdef LOCK_DEBUG
1184         if (lockmethod == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
1185                 elog(LOG, "LockReleaseAll: lockmethod=%d, pid=%d",
1186                          lockmethod, proc->pid);
1187 #endif
1188
1189         Assert(lockmethod < NumLockMethods);
1190         lockMethodTable = LockMethodTable[lockmethod];
1191         if (!lockMethodTable)
1192         {
1193                 elog(WARNING, "LockReleaseAll: bad lockmethod %d", lockmethod);
1194                 return FALSE;
1195         }
1196
1197         numLockModes = lockMethodTable->numLockModes;
1198         masterLock = lockMethodTable->masterLock;
1199
1200         LWLockAcquire(masterLock, LW_EXCLUSIVE);
1201
1202         holder = (PROCLOCK *) SHMQueueNext(procHolders, procHolders,
1203                                                                            offsetof(PROCLOCK, procLink));
1204
1205         while (holder)
1206         {
1207                 bool            wakeupNeeded = false;
1208
1209                 /* Get link first, since we may unlink/delete this holder */
1210                 nextHolder = (PROCLOCK *) SHMQueueNext(procHolders, &holder->procLink,
1211                                                                                    offsetof(PROCLOCK, procLink));
1212
1213                 Assert(holder->tag.proc == MAKE_OFFSET(proc));
1214
1215                 lock = (LOCK *) MAKE_PTR(holder->tag.lock);
1216
1217                 /* Ignore items that are not of the lockmethod to be removed */
1218                 if (LOCK_LOCKMETHOD(*lock) != lockmethod)
1219                         goto next_item;
1220
1221                 /* If not allxids, ignore items that are of the wrong xid */
1222                 if (!allxids && !TransactionIdEquals(xid, holder->tag.xid))
1223                         goto next_item;
1224
1225                 PROCLOCK_PRINT("LockReleaseAll", holder);
1226                 LOCK_PRINT("LockReleaseAll", lock, 0);
1227                 Assert(lock->nRequested >= 0);
1228                 Assert(lock->nGranted >= 0);
1229                 Assert(lock->nGranted <= lock->nRequested);
1230                 Assert(holder->nHolding >= 0);
1231                 Assert(holder->nHolding <= lock->nRequested);
1232
1233                 /*
1234                  * fix the general lock stats
1235                  */
1236                 if (lock->nRequested != holder->nHolding)
1237                 {
1238                         for (i = 1; i <= numLockModes; i++)
1239                         {
1240                                 Assert(holder->holding[i] >= 0);
1241                                 if (holder->holding[i] > 0)
1242                                 {
1243                                         lock->requested[i] -= holder->holding[i];
1244                                         lock->granted[i] -= holder->holding[i];
1245                                         Assert(lock->requested[i] >= 0 && lock->granted[i] >= 0);
1246                                         if (lock->granted[i] == 0)
1247                                                 lock->grantMask &= BITS_OFF[i];
1248
1249                                         /*
1250                                          * Read comments in LockRelease
1251                                          */
1252                                         if (!wakeupNeeded &&
1253                                                 lockMethodTable->conflictTab[i] & lock->waitMask)
1254                                                 wakeupNeeded = true;
1255                                 }
1256                         }
1257                         lock->nRequested -= holder->nHolding;
1258                         lock->nGranted -= holder->nHolding;
1259                         Assert((lock->nRequested >= 0) && (lock->nGranted >= 0));
1260                         Assert(lock->nGranted <= lock->nRequested);
1261                 }
1262                 else
1263                 {
1264                         /*
1265                          * This holder accounts for all the requested locks on the
1266                          * object, so we can be lazy and just zero things out.
1267                          */
1268                         lock->nRequested = 0;
1269                         lock->nGranted = 0;
1270                         /* Fix the lock status, just for next LOCK_PRINT message. */
1271                         for (i = 1; i <= numLockModes; i++)
1272                         {
1273                                 Assert(lock->requested[i] == lock->granted[i]);
1274                                 lock->requested[i] = lock->granted[i] = 0;
1275                         }
1276                 }
1277                 LOCK_PRINT("LockReleaseAll: updated", lock, 0);
1278
1279                 PROCLOCK_PRINT("LockReleaseAll: deleting", holder);
1280
1281                 /*
1282                  * Remove the holder entry from the linked lists
1283                  */
1284                 SHMQueueDelete(&holder->lockLink);
1285                 SHMQueueDelete(&holder->procLink);
1286
1287                 /*
1288                  * remove the holder entry from the hashtable
1289                  */
1290                 holder = (PROCLOCK *) hash_search(lockMethodTable->holderHash,
1291                                                                                   (void *) holder,
1292                                                                                   HASH_REMOVE,
1293                                                                                   NULL);
1294                 if (!holder)
1295                 {
1296                         LWLockRelease(masterLock);
1297                         elog(WARNING, "LockReleaseAll: holder table corrupted");
1298                         return FALSE;
1299                 }
1300
1301                 if (lock->nRequested == 0)
1302                 {
1303                         /*
1304                          * We've just released the last lock, so garbage-collect the
1305                          * lock object.
1306                          */
1307                         LOCK_PRINT("LockReleaseAll: deleting", lock, 0);
1308                         Assert(lockMethodTable->lockHash->hash == tag_hash);
1309                         lock = (LOCK *) hash_search(lockMethodTable->lockHash,
1310                                                                                 (void *) &(lock->tag),
1311                                                                                 HASH_REMOVE, NULL);
1312                         if (!lock)
1313                         {
1314                                 LWLockRelease(masterLock);
1315                                 elog(WARNING, "LockReleaseAll: cannot remove lock from HTAB");
1316                                 return FALSE;
1317                         }
1318                 }
1319                 else if (wakeupNeeded)
1320                         ProcLockWakeup(lockMethodTable, lock);
1321
1322 next_item:
1323                 holder = nextHolder;
1324         }
1325
1326         LWLockRelease(masterLock);
1327
1328 #ifdef LOCK_DEBUG
1329         if (lockmethod == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
1330                 elog(LOG, "LockReleaseAll: done");
1331 #endif
1332
1333         return TRUE;
1334 }
1335
1336 int
1337 LockShmemSize(int maxBackends)
1338 {
1339         int                     size = 0;
1340         long            max_table_size = NLOCKENTS(maxBackends);
1341
1342         size += MAXALIGN(sizeof(PROC_HDR)); /* ProcGlobal */
1343         size += maxBackends * MAXALIGN(sizeof(PGPROC));         /* each MyProc */
1344         size += MAX_LOCK_METHODS * MAXALIGN(sizeof(LOCKMETHODTABLE));           /* each lockMethodTable */
1345
1346         /* lockHash table */
1347         size += hash_estimate_size(max_table_size, sizeof(LOCK));
1348
1349         /* holderHash table */
1350         size += hash_estimate_size(max_table_size, sizeof(PROCLOCK));
1351
1352         /*
1353          * Since the lockHash entry count above is only an estimate, add 10%
1354          * safety margin.
1355          */
1356         size += size / 10;
1357
1358         return size;
1359 }
1360
1361 /*
1362  * GetLockStatusData - Return a summary of the lock manager's internal
1363  * status, for use in a user-level reporting function.
1364  *
1365  * The return data consists of an array of PROCLOCK objects, with the
1366  * associated PGPROC and LOCK objects for each.  Note that multiple
1367  * copies of the same PGPROC and/or LOCK objects are likely to appear.
1368  * It is the caller's responsibility to match up duplicates if wanted.
1369  *
1370  * The design goal is to hold the LockMgrLock for as short a time as possible;
1371  * thus, this function simply makes a copy of the necessary data and releases
1372  * the lock, allowing the caller to contemplate and format the data for as
1373  * long as it pleases.
1374  */
1375 LockData *
1376 GetLockStatusData(void)
1377 {
1378         LockData   *data;
1379         HTAB       *holderTable;
1380         PROCLOCK   *holder;
1381         HASH_SEQ_STATUS seqstat;
1382         int                     i;
1383
1384         data = (LockData *) palloc(sizeof(LockData));
1385
1386         LWLockAcquire(LockMgrLock, LW_EXCLUSIVE);
1387
1388         holderTable = LockMethodTable[DEFAULT_LOCKMETHOD]->holderHash;
1389
1390         data->nelements = i = holderTable->hctl->nentries;
1391
1392         if (i == 0)
1393                 i = 1;                                  /* avoid palloc(0) if empty table */
1394
1395         data->holderaddrs = (SHMEM_OFFSET *) palloc(sizeof(SHMEM_OFFSET) * i);
1396         data->holders = (PROCLOCK *) palloc(sizeof(PROCLOCK) * i);
1397         data->procs = (PGPROC *) palloc(sizeof(PGPROC) * i);
1398         data->locks = (LOCK *) palloc(sizeof(LOCK) * i);
1399
1400         hash_seq_init(&seqstat, holderTable);
1401
1402         i = 0;
1403         while ((holder = hash_seq_search(&seqstat)))
1404         {
1405                 PGPROC     *proc = (PGPROC *) MAKE_PTR(holder->tag.proc);
1406                 LOCK       *lock = (LOCK *) MAKE_PTR(holder->tag.lock);
1407
1408                 data->holderaddrs[i] = MAKE_OFFSET(holder);
1409                 memcpy(&(data->holders[i]), holder, sizeof(PROCLOCK));
1410                 memcpy(&(data->procs[i]), proc, sizeof(PGPROC));
1411                 memcpy(&(data->locks[i]), lock, sizeof(LOCK));
1412
1413                 i++;
1414         }
1415
1416         LWLockRelease(LockMgrLock);
1417
1418         Assert(i == data->nelements);
1419
1420         return data;
1421 }
1422
1423 /* Provide the textual name of any lock mode */
1424 const char *
1425 GetLockmodeName(LOCKMODE mode)
1426 {
1427         Assert(mode <= MAX_LOCKMODES);
1428         return lock_mode_names[mode];
1429 }
1430
1431 #ifdef LOCK_DEBUG
1432 /*
1433  * Dump all locks in the proc->procHolders list.
1434  *
1435  * Must have already acquired the masterLock.
1436  */
1437 void
1438 DumpLocks(void)
1439 {
1440         PGPROC     *proc;
1441         SHM_QUEUE  *procHolders;
1442         PROCLOCK   *holder;
1443         LOCK       *lock;
1444         int                     lockmethod = DEFAULT_LOCKMETHOD;
1445         LOCKMETHODTABLE *lockMethodTable;
1446
1447         proc = MyProc;
1448         if (proc == NULL)
1449                 return;
1450
1451         procHolders = &proc->procHolders;
1452
1453         Assert(lockmethod < NumLockMethods);
1454         lockMethodTable = LockMethodTable[lockmethod];
1455         if (!lockMethodTable)
1456                 return;
1457
1458         if (proc->waitLock)
1459                 LOCK_PRINT("DumpLocks: waiting on", proc->waitLock, 0);
1460
1461         holder = (PROCLOCK *) SHMQueueNext(procHolders, procHolders,
1462                                                                            offsetof(PROCLOCK, procLink));
1463
1464         while (holder)
1465         {
1466                 Assert(holder->tag.proc == MAKE_OFFSET(proc));
1467
1468                 lock = (LOCK *) MAKE_PTR(holder->tag.lock);
1469
1470                 PROCLOCK_PRINT("DumpLocks", holder);
1471                 LOCK_PRINT("DumpLocks", lock, 0);
1472
1473                 holder = (PROCLOCK *) SHMQueueNext(procHolders, &holder->procLink,
1474                                                                                    offsetof(PROCLOCK, procLink));
1475         }
1476 }
1477
1478 /*
1479  * Dump all postgres locks. Must have already acquired the masterLock.
1480  */
1481 void
1482 DumpAllLocks(void)
1483 {
1484         PGPROC     *proc;
1485         PROCLOCK   *holder;
1486         LOCK       *lock;
1487         int                     lockmethod = DEFAULT_LOCKMETHOD;
1488         LOCKMETHODTABLE *lockMethodTable;
1489         HTAB       *holderTable;
1490         HASH_SEQ_STATUS status;
1491
1492         proc = MyProc;
1493         if (proc == NULL)
1494                 return;
1495
1496         Assert(lockmethod < NumLockMethods);
1497         lockMethodTable = LockMethodTable[lockmethod];
1498         if (!lockMethodTable)
1499                 return;
1500
1501         holderTable = lockMethodTable->holderHash;
1502
1503         if (proc->waitLock)
1504                 LOCK_PRINT("DumpAllLocks: waiting on", proc->waitLock, 0);
1505
1506         hash_seq_init(&status, holderTable);
1507         while ((holder = (PROCLOCK *) hash_seq_search(&status)) != NULL)
1508         {
1509                 PROCLOCK_PRINT("DumpAllLocks", holder);
1510
1511                 if (holder->tag.lock)
1512                 {
1513                         lock = (LOCK *) MAKE_PTR(holder->tag.lock);
1514                         LOCK_PRINT("DumpAllLocks", lock, 0);
1515                 }
1516                 else
1517                         elog(LOG, "DumpAllLocks: holder->tag.lock = NULL");
1518         }
1519 }
1520
1521 #endif   /* LOCK_DEBUG */