]> granicus.if.org Git - postgresql/blob - src/backend/storage/lmgr/lock.c
Fix user locks. Broken some time ago for all platforms by Windows-related
[postgresql] / src / backend / storage / lmgr / lock.c
1 /*-------------------------------------------------------------------------
2  *
3  * lock.c
4  *        POSTGRES low-level lock mechanism
5  *
6  * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $PostgreSQL: pgsql/src/backend/storage/lmgr/lock.c,v 1.136 2004/08/26 17:23:30 tgl Exp $
12  *
13  * NOTES
14  *        Outside modules can create a lock table and acquire/release
15  *        locks.  A lock table is a shared memory hash table.  When
16  *        a process tries to acquire a lock of a type that conflicts
17  *        with existing locks, it is put to sleep using the routines
18  *        in storage/lmgr/proc.c.
19  *
20  *        For the most part, this code should be invoked via lmgr.c
21  *        or another lock-management module, not directly.
22  *
23  *      Interface:
24  *
25  *      LockAcquire(), LockRelease(), LockMethodTableInit(),
26  *      LockMethodTableRename(), LockReleaseAll(),
27  *      LockCheckConflicts(), GrantLock()
28  *
29  *-------------------------------------------------------------------------
30  */
31 #include "postgres.h"
32
33 #include <signal.h>
34 #include <unistd.h>
35
36 #include "access/xact.h"
37 #include "miscadmin.h"
38 #include "storage/proc.h"
39 #include "utils/memutils.h"
40 #include "utils/ps_status.h"
41 #include "utils/resowner.h"
42
43
44 /* This configuration variable is used to set the lock table size */
45 int                     max_locks_per_xact; /* set by guc.c */
46
47 #define NLOCKENTS(maxBackends)  (max_locks_per_xact * (maxBackends))
48
49
50 static int WaitOnLock(LOCKMETHODID lockmethodid, LOCKMODE lockmode,
51                    LOCK *lock, PROCLOCK *proclock);
52 static void LockCountMyLocks(SHMEM_OFFSET lockOffset, PGPROC *proc,
53                                  int *myHolding);
54
55 static char *lock_mode_names[] =
56 {
57         "INVALID",
58         "AccessShareLock",
59         "RowShareLock",
60         "RowExclusiveLock",
61         "ShareUpdateExclusiveLock",
62         "ShareLock",
63         "ShareRowExclusiveLock",
64         "ExclusiveLock",
65         "AccessExclusiveLock"
66 };
67
68
69 #ifdef LOCK_DEBUG
70
71 /*------
72  * The following configuration options are available for lock debugging:
73  *
74  *         TRACE_LOCKS          -- give a bunch of output what's going on in this file
75  *         TRACE_USERLOCKS      -- same but for user locks
76  *         TRACE_LOCK_OIDMIN-- do not trace locks for tables below this oid
77  *                                                 (use to avoid output on system tables)
78  *         TRACE_LOCK_TABLE -- trace locks on this table (oid) unconditionally
79  *         DEBUG_DEADLOCKS      -- currently dumps locks at untimely occasions ;)
80  *
81  * Furthermore, but in storage/lmgr/lwlock.c:
82  *         TRACE_LWLOCKS        -- trace lightweight locks (pretty useless)
83  *
84  * Define LOCK_DEBUG at compile time to get all these enabled.
85  * --------
86  */
87
88 int                     Trace_lock_oidmin = BootstrapObjectIdData;
89 bool            Trace_locks = false;
90 bool            Trace_userlocks = false;
91 int                     Trace_lock_table = 0;
92 bool            Debug_deadlocks = false;
93
94
95 inline static bool
96 LOCK_DEBUG_ENABLED(const LOCK *lock)
97 {
98         return
99                 (((LOCK_LOCKMETHOD(*lock) == DEFAULT_LOCKMETHOD && Trace_locks)
100            || (LOCK_LOCKMETHOD(*lock) == USER_LOCKMETHOD && Trace_userlocks))
101                  && (lock->tag.relId >= (Oid) Trace_lock_oidmin))
102                 || (Trace_lock_table && (lock->tag.relId == Trace_lock_table));
103 }
104
105
106 inline static void
107 LOCK_PRINT(const char *where, const LOCK *lock, LOCKMODE type)
108 {
109         if (LOCK_DEBUG_ENABLED(lock))
110                 elog(LOG,
111                          "%s: lock(%lx) tbl(%d) rel(%u) db(%u) obj(%u) grantMask(%x) "
112                          "req(%d,%d,%d,%d,%d,%d,%d)=%d "
113                          "grant(%d,%d,%d,%d,%d,%d,%d)=%d wait(%d) type(%s)",
114                          where, MAKE_OFFSET(lock),
115                          lock->tag.lockmethodid, lock->tag.relId, lock->tag.dbId,
116                          lock->tag.objId.blkno, lock->grantMask,
117                          lock->requested[1], lock->requested[2], lock->requested[3],
118                          lock->requested[4], lock->requested[5], lock->requested[6],
119                          lock->requested[7], lock->nRequested,
120                          lock->granted[1], lock->granted[2], lock->granted[3],
121                          lock->granted[4], lock->granted[5], lock->granted[6],
122                          lock->granted[7], lock->nGranted,
123                          lock->waitProcs.size, lock_mode_names[type]);
124 }
125
126
127 inline static void
128 PROCLOCK_PRINT(const char *where, const PROCLOCK *proclockP)
129 {
130         if (
131                 (((PROCLOCK_LOCKMETHOD(*proclockP) == DEFAULT_LOCKMETHOD && Trace_locks)
132                   || (PROCLOCK_LOCKMETHOD(*proclockP) == USER_LOCKMETHOD && Trace_userlocks))
133                  && (((LOCK *) MAKE_PTR(proclockP->tag.lock))->tag.relId >= (Oid) Trace_lock_oidmin))
134                 || (Trace_lock_table && (((LOCK *) MAKE_PTR(proclockP->tag.lock))->tag.relId == Trace_lock_table))
135                 )
136                 elog(LOG,
137                          "%s: proclock(%lx) lock(%lx) tbl(%d) proc(%lx) xid(%u) hold(%d,%d,%d,%d,%d,%d,%d)=%d",
138                          where, MAKE_OFFSET(proclockP), proclockP->tag.lock,
139                          PROCLOCK_LOCKMETHOD(*(proclockP)),
140                          proclockP->tag.proc, proclockP->tag.xid,
141                          proclockP->holding[1], proclockP->holding[2], proclockP->holding[3],
142                          proclockP->holding[4], proclockP->holding[5], proclockP->holding[6],
143                          proclockP->holding[7], proclockP->nHolding);
144 }
145
146 #else                                                   /* not LOCK_DEBUG */
147
148 #define LOCK_PRINT(where, lock, type)
149 #define PROCLOCK_PRINT(where, proclockP)
150 #endif   /* not LOCK_DEBUG */
151
152
153 /*
154  * map from lock method id to the lock table structure
155  */
156 static LockMethod LockMethods[MAX_LOCK_METHODS];
157 static HTAB*    LockMethodLockHash[MAX_LOCK_METHODS];
158 static HTAB*    LockMethodProcLockHash[MAX_LOCK_METHODS];
159
160 /* exported so lmgr.c can initialize it */
161 int             NumLockMethods;
162
163
164 /*
165  * InitLocks -- Init the lock module.  Create a private data
166  *              structure for constructing conflict masks.
167  */
168 void
169 InitLocks(void)
170 {
171         /* NOP */
172 }
173
174
175 /*
176  * Fetch the lock method table associated with a given lock
177  */
178 LockMethod
179 GetLocksMethodTable(LOCK *lock)
180 {
181         LOCKMETHODID    lockmethodid = LOCK_LOCKMETHOD(*lock);
182
183         Assert(0 < lockmethodid && lockmethodid < NumLockMethods);
184         return LockMethods[lockmethodid];
185 }
186
187
188 /*
189  * LockMethodInit -- initialize the lock table's lock type
190  *              structures
191  *
192  * Notes: just copying.  Should only be called once.
193  */
194 static void
195 LockMethodInit(LockMethod lockMethodTable,
196                            const LOCKMASK *conflictsP,
197                            int numModes)
198 {
199         int                     i;
200
201         lockMethodTable->numLockModes = numModes;
202         /* copies useless zero element as well as the N lockmodes */
203         for (i = 0; i <= numModes; i++)
204                 lockMethodTable->conflictTab[i] = conflictsP[i];
205 }
206
207 /*
208  * LockMethodTableInit -- initialize a lock table structure
209  *
210  * NOTE: data structures allocated here are allocated permanently, using
211  * TopMemoryContext and shared memory.  We don't ever release them anyway,
212  * and in normal multi-backend operation the lock table structures set up
213  * by the postmaster are inherited by each backend, so they must be in
214  * TopMemoryContext.
215  */
216 LOCKMETHODID
217 LockMethodTableInit(const char *tabName,
218                                         const LOCKMASK *conflictsP,
219                                         int numModes,
220                                         int maxBackends)
221 {
222         LockMethod      newLockMethod;
223         char       *shmemName;
224         HASHCTL         info;
225         int                     hash_flags;
226         bool            found;
227         long            init_table_size,
228                                 max_table_size;
229
230         if (numModes >= MAX_LOCKMODES)
231                 elog(ERROR, "too many lock types %d (limit is %d)",
232                          numModes, MAX_LOCKMODES-1);
233
234         /* Compute init/max size to request for lock hashtables */
235         max_table_size = NLOCKENTS(maxBackends);
236         init_table_size = max_table_size / 10;
237
238         /* Allocate a string for the shmem index table lookups. */
239         /* This is just temp space in this routine, so palloc is OK. */
240         shmemName = (char *) palloc(strlen(tabName) + 32);
241
242         /* each lock table has a header in shared memory */
243         sprintf(shmemName, "%s (lock method table)", tabName);
244         newLockMethod = (LockMethod)
245                 ShmemInitStruct(shmemName, sizeof(LockMethodData), &found);
246
247         if (!newLockMethod)
248                 elog(FATAL, "could not initialize lock table \"%s\"", tabName);
249
250         /*
251          * we're first - initialize
252          */
253         if (!found)
254         {
255                 MemSet(newLockMethod, 0, sizeof(LockMethodData));
256                 newLockMethod->masterLock = LockMgrLock;
257                 newLockMethod->lockmethodid = NumLockMethods;
258                 LockMethodInit(newLockMethod, conflictsP, numModes);
259         }
260
261         /*
262          * other modules refer to the lock table by a lockmethod ID
263          */
264         LockMethods[NumLockMethods] = newLockMethod;
265         NumLockMethods++;
266         Assert(NumLockMethods <= MAX_LOCK_METHODS);
267
268         /*
269          * allocate a hash table for LOCK structs.      This is used to store
270          * per-locked-object information.
271          */
272         info.keysize = sizeof(LOCKTAG);
273         info.entrysize = sizeof(LOCK);
274         info.hash = tag_hash;
275         hash_flags = (HASH_ELEM | HASH_FUNCTION);
276
277         sprintf(shmemName, "%s (lock hash)", tabName);
278         LockMethodLockHash[NumLockMethods-1] = ShmemInitHash(shmemName,
279                                                                                         init_table_size,
280                                                                                         max_table_size,
281                                                                                         &info,
282                                                                                         hash_flags);
283
284         if (!LockMethodLockHash[NumLockMethods-1])
285                 elog(FATAL, "could not initialize lock table \"%s\"", tabName);
286         Assert(LockMethodLockHash[NumLockMethods-1]->hash == tag_hash);
287
288         /*
289          * allocate a hash table for PROCLOCK structs.  This is used to store
290          * per-lock-proclock information.
291          */
292         info.keysize = sizeof(PROCLOCKTAG);
293         info.entrysize = sizeof(PROCLOCK);
294         info.hash = tag_hash;
295         hash_flags = (HASH_ELEM | HASH_FUNCTION);
296
297         sprintf(shmemName, "%s (proclock hash)", tabName);
298         LockMethodProcLockHash[NumLockMethods-1] = ShmemInitHash(shmemName,
299                                                                                                 init_table_size,
300                                                                                                 max_table_size,
301                                                                                                 &info,
302                                                                                                 hash_flags);
303
304         if (!LockMethodProcLockHash[NumLockMethods-1])
305                 elog(FATAL, "could not initialize lock table \"%s\"", tabName);
306
307         pfree(shmemName);
308
309         return newLockMethod->lockmethodid;
310 }
311
312 /*
313  * LockMethodTableRename -- allocate another lockmethod ID to the same
314  *              lock table.
315  *
316  * NOTES: Both the lock module and the lock chain (lchain.c)
317  *              module use table id's to distinguish between different
318  *              kinds of locks.  Short term and long term locks look
319  *              the same to the lock table, but are handled differently
320  *              by the lock chain manager.      This function allows the
321  *              client to use different lockmethods when acquiring/releasing
322  *              short term and long term locks, yet store them all in one hashtable.
323  */
324
325 LOCKMETHODID
326 LockMethodTableRename(LOCKMETHODID lockmethodid)
327 {
328         LOCKMETHODID    newLockMethodId;
329
330         if (NumLockMethods >= MAX_LOCK_METHODS)
331                 return INVALID_LOCKMETHOD;
332         if (LockMethods[lockmethodid] == INVALID_LOCKMETHOD)
333                 return INVALID_LOCKMETHOD;
334
335         /* other modules refer to the lock table by a lockmethod ID */
336         newLockMethodId = NumLockMethods;
337         NumLockMethods++;
338
339         LockMethods[newLockMethodId] = LockMethods[lockmethodid];
340         LockMethodLockHash[newLockMethodId] = LockMethodLockHash[lockmethodid];
341         LockMethodProcLockHash[newLockMethodId] = LockMethodProcLockHash[lockmethodid];
342
343         return newLockMethodId;
344 }
345
346 /*
347  * LockAcquire -- Check for lock conflicts, sleep if conflict found,
348  *              set lock if/when no conflicts.
349  *
350  * Returns: TRUE if lock was acquired, FALSE otherwise.  Note that
351  *              a FALSE return is to be expected if dontWait is TRUE;
352  *              but if dontWait is FALSE, only a parameter error can cause
353  *              a FALSE return.  (XXX probably we should just ereport on parameter
354  *              errors, instead of conflating this with failure to acquire lock?)
355  *
356  * Side Effects: The lock is acquired and recorded in lock tables.
357  *
358  * NOTE: if we wait for the lock, there is no way to abort the wait
359  * short of aborting the transaction.
360  *
361  *
362  * Note on User Locks:
363  *
364  *              User locks are handled totally on the application side as
365  *              long term cooperative locks which extend beyond the normal
366  *              transaction boundaries.  Their purpose is to indicate to an
367  *              application that someone is `working' on an item.  So it is
368  *              possible to put an user lock on a tuple's oid, retrieve the
369  *              tuple, work on it for an hour and then update it and remove
370  *              the lock.  While the lock is active other clients can still
371  *              read and write the tuple but they can be aware that it has
372  *              been locked at the application level by someone.
373  *              User locks use lock tags made of an uint16 and an uint32, for
374  *              example 0 and a tuple oid, or any other arbitrary pair of
375  *              numbers following a convention established by the application.
376  *              In this sense tags don't refer to tuples or database entities.
377  *              User locks and normal locks are completely orthogonal and
378  *              they don't interfere with each other, so it is possible
379  *              to acquire a normal lock on an user-locked tuple or user-lock
380  *              a tuple for which a normal write lock already exists.
381  *              User locks are always non blocking, therefore they are never
382  *              acquired if already held by another process.  They must be
383  *              released explicitly by the application but they are released
384  *              automatically when a backend terminates.
385  *              They are indicated by a lockmethod 2 which is an alias for the
386  *              normal lock table, and are distinguished from normal locks
387  *              by the following differences:
388  *
389  *                                                                              normal lock             user lock
390  *
391  *              lockmethodid                                    1                               2
392  *              tag.dbId                                                database oid    database oid
393  *              tag.relId                                               rel oid or 0    0
394  *              tag.objId                                               block id                lock id2
395  *                                                                              or xact id
396  *              tag.offnum                                              0                               lock id1
397  *              proclock.xid                                    xid or 0                0
398  *              persistence                                             transaction             user or backend
399  *                                                                              or backend
400  *
401  *              The lockmode parameter can have the same values for normal locks
402  *              although probably only WRITE_LOCK can have some practical use.
403  *
404  *                                                                                                              DZ - 22 Nov 1997
405  */
406
407 bool
408 LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
409                         TransactionId xid, LOCKMODE lockmode, bool dontWait)
410 {
411         PROCLOCK   *proclock;
412         PROCLOCKTAG proclocktag;
413         HTAB       *proclockTable;
414         bool            found;
415         LOCK       *lock;
416         LWLockId        masterLock;
417         LockMethod      lockMethodTable;
418         int                     status;
419         int                     myHolding[MAX_LOCKMODES];
420         int                     i;
421
422 #ifdef LOCK_DEBUG
423         if (lockmethodid == USER_LOCKMETHOD && Trace_userlocks)
424                 elog(LOG, "LockAcquire: user lock [%u] %s",
425                          locktag->objId.blkno, lock_mode_names[lockmode]);
426 #endif
427
428         /* ???????? This must be changed when short term locks will be used */
429         locktag->lockmethodid = lockmethodid;
430
431         /* Prepare to record the lock in the current resource owner */
432         ResourceOwnerEnlargeLocks(CurrentResourceOwner);
433
434         Assert(lockmethodid < NumLockMethods);
435         lockMethodTable = LockMethods[lockmethodid];
436         if (!lockMethodTable)
437         {
438                 elog(WARNING, "bad lock table id: %d", lockmethodid);
439                 return FALSE;
440         }
441
442         masterLock = lockMethodTable->masterLock;
443
444         LWLockAcquire(masterLock, LW_EXCLUSIVE);
445
446         /*
447          * Find or create a lock with this tag
448          */
449         Assert(LockMethodLockHash[lockmethodid]->hash == tag_hash);
450         lock = (LOCK *) hash_search(LockMethodLockHash[lockmethodid],
451                                                                 (void *) locktag,
452                                                                 HASH_ENTER, &found);
453         if (!lock)
454         {
455                 LWLockRelease(masterLock);
456                 ereport(ERROR,
457                                 (errcode(ERRCODE_OUT_OF_MEMORY),
458                                  errmsg("out of shared memory"),
459                                  errhint("You may need to increase max_locks_per_transaction.")));
460         }
461
462         /*
463          * if it's a new lock object, initialize it
464          */
465         if (!found)
466         {
467                 lock->grantMask = 0;
468                 lock->waitMask = 0;
469                 SHMQueueInit(&(lock->lockHolders));
470                 ProcQueueInit(&(lock->waitProcs));
471                 lock->nRequested = 0;
472                 lock->nGranted = 0;
473                 MemSet((char *) lock->requested, 0, sizeof(int) * MAX_LOCKMODES);
474                 MemSet((char *) lock->granted, 0, sizeof(int) * MAX_LOCKMODES);
475                 LOCK_PRINT("LockAcquire: new", lock, lockmode);
476         }
477         else
478         {
479                 LOCK_PRINT("LockAcquire: found", lock, lockmode);
480                 Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
481                 Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
482                 Assert(lock->nGranted <= lock->nRequested);
483         }
484
485         /*
486          * Create the hash key for the proclock table.
487          */
488         MemSet(&proclocktag, 0, sizeof(PROCLOCKTAG));           /* must clear padding,
489                                                                                                                  * needed */
490         proclocktag.lock = MAKE_OFFSET(lock);
491         proclocktag.proc = MAKE_OFFSET(MyProc);
492         TransactionIdStore(xid, &proclocktag.xid);
493
494         /*
495          * Find or create a proclock entry with this tag
496          */
497         proclockTable = LockMethodProcLockHash[lockmethodid];
498         proclock = (PROCLOCK *) hash_search(proclockTable,
499                                                                                 (void *) &proclocktag,
500                                                                                 HASH_ENTER, &found);
501         if (!proclock)
502         {
503                 LWLockRelease(masterLock);
504                 ereport(ERROR,
505                                 (errcode(ERRCODE_OUT_OF_MEMORY),
506                                  errmsg("out of shared memory"),
507                                  errhint("You may need to increase max_locks_per_transaction.")));
508         }
509
510         /*
511          * If new, initialize the new entry
512          */
513         if (!found)
514         {
515                 proclock->nHolding = 0;
516                 MemSet((char *) proclock->holding, 0, sizeof(int) * MAX_LOCKMODES);
517                 /* Add proclock to appropriate lists */
518                 SHMQueueInsertBefore(&lock->lockHolders, &proclock->lockLink);
519                 SHMQueueInsertBefore(&MyProc->procHolders, &proclock->procLink);
520                 PROCLOCK_PRINT("LockAcquire: new", proclock);
521         }
522         else
523         {
524                 PROCLOCK_PRINT("LockAcquire: found", proclock);
525                 Assert((proclock->nHolding >= 0) && (proclock->holding[lockmode] >= 0));
526                 Assert(proclock->nHolding <= lock->nGranted);
527
528 #ifdef CHECK_DEADLOCK_RISK
529
530                 /*
531                  * Issue warning if we already hold a lower-level lock on this
532                  * object and do not hold a lock of the requested level or higher.
533                  * This indicates a deadlock-prone coding practice (eg, we'd have
534                  * a deadlock if another backend were following the same code path
535                  * at about the same time).
536                  *
537                  * This is not enabled by default, because it may generate log
538                  * entries about user-level coding practices that are in fact safe
539                  * in context. It can be enabled to help find system-level
540                  * problems.
541                  *
542                  * XXX Doing numeric comparison on the lockmodes is a hack; it'd be
543                  * better to use a table.  For now, though, this works.
544                  */
545                 for (i = lockMethodTable->numLockModes; i > 0; i--)
546                 {
547                         if (proclock->holding[i] > 0)
548                         {
549                                 if (i >= (int) lockmode)
550                                         break;          /* safe: we have a lock >= req level */
551                                 elog(LOG, "deadlock risk: raising lock level"
552                                          " from %s to %s on object %u/%u/%u",
553                                          lock_mode_names[i], lock_mode_names[lockmode],
554                                  lock->tag.relId, lock->tag.dbId, lock->tag.objId.blkno);
555                                 break;
556                         }
557                 }
558 #endif   /* CHECK_DEADLOCK_RISK */
559         }
560
561         /*
562          * lock->nRequested and lock->requested[] count the total number of
563          * requests, whether granted or waiting, so increment those
564          * immediately. The other counts don't increment till we get the lock.
565          */
566         lock->nRequested++;
567         lock->requested[lockmode]++;
568         Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
569
570         /*
571          * If I already hold one or more locks of the requested type, just
572          * grant myself another one without blocking.
573          */
574         if (proclock->holding[lockmode] > 0)
575         {
576                 GrantLock(lock, proclock, lockmode);
577                 ResourceOwnerRememberLock(CurrentResourceOwner, locktag, xid,
578                                                                   lockmode);
579                 PROCLOCK_PRINT("LockAcquire: owning", proclock);
580                 LWLockRelease(masterLock);
581                 return TRUE;
582         }
583
584         /*
585          * If this process (under any XID) is a proclock of the lock, also
586          * grant myself another one without blocking.
587          */
588         LockCountMyLocks(proclock->tag.lock, MyProc, myHolding);
589         if (myHolding[lockmode] > 0)
590         {
591                 GrantLock(lock, proclock, lockmode);
592                 ResourceOwnerRememberLock(CurrentResourceOwner, locktag, xid,
593                                                                   lockmode);
594                 PROCLOCK_PRINT("LockAcquire: my other XID owning", proclock);
595                 LWLockRelease(masterLock);
596                 return TRUE;
597         }
598
599         /*
600          * If lock requested conflicts with locks requested by waiters, must
601          * join wait queue.  Otherwise, check for conflict with already-held
602          * locks.  (That's last because most complex check.)
603          */
604         if (lockMethodTable->conflictTab[lockmode] & lock->waitMask)
605                 status = STATUS_FOUND;
606         else
607                 status = LockCheckConflicts(lockMethodTable, lockmode,
608                                                                         lock, proclock,
609                                                                         MyProc, myHolding);
610
611         if (status == STATUS_OK)
612         {
613                 /* No conflict with held or previously requested locks */
614                 GrantLock(lock, proclock, lockmode);
615                 ResourceOwnerRememberLock(CurrentResourceOwner, locktag, xid,
616                                                                   lockmode);
617         }
618         else
619         {
620                 Assert(status == STATUS_FOUND);
621
622                 /*
623                  * We can't acquire the lock immediately.  If caller specified no
624                  * blocking, remove the proclock entry and return FALSE without
625                  * waiting.
626                  */
627                 if (dontWait)
628                 {
629                         if (proclock->nHolding == 0)
630                         {
631                                 SHMQueueDelete(&proclock->lockLink);
632                                 SHMQueueDelete(&proclock->procLink);
633                                 proclock = (PROCLOCK *) hash_search(proclockTable,
634                                                                                                         (void *) proclock,
635                                                                                                         HASH_REMOVE, NULL);
636                                 if (!proclock)
637                                         elog(WARNING, "proclock table corrupted");
638                         }
639                         else
640                                 PROCLOCK_PRINT("LockAcquire: NHOLDING", proclock);
641                         lock->nRequested--;
642                         lock->requested[lockmode]--;
643                         LOCK_PRINT("LockAcquire: conditional lock failed", lock, lockmode);
644                         Assert((lock->nRequested > 0) && (lock->requested[lockmode] >= 0));
645                         Assert(lock->nGranted <= lock->nRequested);
646                         LWLockRelease(masterLock);
647                         return FALSE;
648                 }
649
650                 /*
651                  * Construct bitmask of locks this process holds on this object.
652                  */
653                 {
654                         LOCKMASK                heldLocks = 0;
655
656                         for (i = 1; i <= lockMethodTable->numLockModes; i++)
657                         {
658                                 if (myHolding[i] > 0)
659                                         heldLocks |= LOCKBIT_ON(i);
660                         }
661                         MyProc->heldLocks = heldLocks;
662                 }
663
664                 /*
665                  * Sleep till someone wakes me up.
666                  */
667                 status = WaitOnLock(lockmethodid, lockmode, lock, proclock);
668
669                 /*
670                  * NOTE: do not do any material change of state between here and
671                  * return.      All required changes in locktable state must have been
672                  * done when the lock was granted to us --- see notes in
673                  * WaitOnLock.
674                  */
675
676                 /*
677                  * Check the proclock entry status, in case something in the ipc
678                  * communication doesn't work correctly.
679                  */
680                 if (!((proclock->nHolding > 0) && (proclock->holding[lockmode] > 0)))
681                 {
682                         PROCLOCK_PRINT("LockAcquire: INCONSISTENT", proclock);
683                         LOCK_PRINT("LockAcquire: INCONSISTENT", lock, lockmode);
684                         /* Should we retry ? */
685                         LWLockRelease(masterLock);
686                         return FALSE;
687                 }
688                 PROCLOCK_PRINT("LockAcquire: granted", proclock);
689                 LOCK_PRINT("LockAcquire: granted", lock, lockmode);
690         }
691
692         LWLockRelease(masterLock);
693
694         return status == STATUS_OK;
695 }
696
697 /*
698  * LockCheckConflicts -- test whether requested lock conflicts
699  *              with those already granted
700  *
701  * Returns STATUS_FOUND if conflict, STATUS_OK if no conflict.
702  *
703  * NOTES:
704  *              Here's what makes this complicated: one process's locks don't
705  * conflict with one another, even if they are held under different
706  * transaction IDs (eg, session and xact locks do not conflict).
707  * So, we must subtract off our own locks when determining whether the
708  * requested new lock conflicts with those already held.
709  *
710  * The caller can optionally pass the process's total holding counts, if
711  * known.  If NULL is passed then these values will be computed internally.
712  */
713 int
714 LockCheckConflicts(LockMethod lockMethodTable,
715                                    LOCKMODE lockmode,
716                                    LOCK *lock,
717                                    PROCLOCK *proclock,
718                                    PGPROC *proc,
719                                    int *myHolding)              /* myHolding[] array or NULL */
720 {
721         int                     numLockModes = lockMethodTable->numLockModes;
722         LOCKMASK        bitmask;
723         int                     i;
724         int                     localHolding[MAX_LOCKMODES];
725
726         /*
727          * first check for global conflicts: If no locks conflict with my
728          * request, then I get the lock.
729          *
730          * Checking for conflict: lock->grantMask represents the types of
731          * currently held locks.  conflictTable[lockmode] has a bit set for
732          * each type of lock that conflicts with request.       Bitwise compare
733          * tells if there is a conflict.
734          */
735         if (!(lockMethodTable->conflictTab[lockmode] & lock->grantMask))
736         {
737                 PROCLOCK_PRINT("LockCheckConflicts: no conflict", proclock);
738                 return STATUS_OK;
739         }
740
741         /*
742          * Rats.  Something conflicts. But it could still be my own lock.  We
743          * have to construct a conflict mask that does not reflect our own
744          * locks.  Locks held by the current process under another XID also
745          * count as "our own locks".
746          */
747         if (myHolding == NULL)
748         {
749                 /* Caller didn't do calculation of total holding for me */
750                 LockCountMyLocks(proclock->tag.lock, proc, localHolding);
751                 myHolding = localHolding;
752         }
753
754         /* Compute mask of lock types held by other processes */
755         bitmask = 0;
756         for (i = 1; i <= numLockModes; i++)
757         {
758                 if (lock->granted[i] != myHolding[i])
759                         bitmask |= LOCKBIT_ON(i);
760         }
761
762         /*
763          * now check again for conflicts.  'bitmask' describes the types of
764          * locks held by other processes.  If one of these conflicts with the
765          * kind of lock that I want, there is a conflict and I have to sleep.
766          */
767         if (!(lockMethodTable->conflictTab[lockmode] & bitmask))
768         {
769                 /* no conflict. OK to get the lock */
770                 PROCLOCK_PRINT("LockCheckConflicts: resolved", proclock);
771                 return STATUS_OK;
772         }
773
774         PROCLOCK_PRINT("LockCheckConflicts: conflicting", proclock);
775         return STATUS_FOUND;
776 }
777
778 /*
779  * LockCountMyLocks --- Count total number of locks held on a given lockable
780  *              object by a given process (under any transaction ID).
781  *
782  * XXX This could be rather slow if the process holds a large number of locks.
783  * Perhaps it could be sped up if we kept yet a third hashtable of per-
784  * process lock information.  However, for the normal case where a transaction
785  * doesn't hold a large number of locks, keeping such a table would probably
786  * be a net slowdown.
787  */
788 static void
789 LockCountMyLocks(SHMEM_OFFSET lockOffset, PGPROC *proc, int *myHolding)
790 {
791         SHM_QUEUE  *procHolders = &(proc->procHolders);
792         PROCLOCK   *proclock;
793         int                     i;
794
795         MemSet(myHolding, 0, MAX_LOCKMODES * sizeof(int));
796
797         proclock = (PROCLOCK *) SHMQueueNext(procHolders, procHolders,
798                                                                                  offsetof(PROCLOCK, procLink));
799
800         while (proclock)
801         {
802                 if (lockOffset == proclock->tag.lock)
803                 {
804                         for (i = 1; i < MAX_LOCKMODES; i++)
805                                 myHolding[i] += proclock->holding[i];
806                 }
807
808                 proclock = (PROCLOCK *) SHMQueueNext(procHolders, &proclock->procLink,
809                                                                                    offsetof(PROCLOCK, procLink));
810         }
811 }
812
813 /*
814  * GrantLock -- update the lock and proclock data structures to show
815  *              the lock request has been granted.
816  *
817  * NOTE: if proc was blocked, it also needs to be removed from the wait list
818  * and have its waitLock/waitHolder fields cleared.  That's not done here.
819  *
820  * NOTE: the lock also has to be recorded in the current ResourceOwner;
821  * but since we may be awaking some other process, we can't do that here.
822  */
823 void
824 GrantLock(LOCK *lock, PROCLOCK *proclock, LOCKMODE lockmode)
825 {
826         lock->nGranted++;
827         lock->granted[lockmode]++;
828         lock->grantMask |= LOCKBIT_ON(lockmode);
829         if (lock->granted[lockmode] == lock->requested[lockmode])
830                 lock->waitMask &= LOCKBIT_OFF(lockmode);
831         LOCK_PRINT("GrantLock", lock, lockmode);
832         Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
833         Assert(lock->nGranted <= lock->nRequested);
834         proclock->holding[lockmode]++;
835         proclock->nHolding++;
836         Assert((proclock->nHolding > 0) && (proclock->holding[lockmode] > 0));
837 }
838
839 /*
840  * WaitOnLock -- wait to acquire a lock
841  *
842  * Caller must have set MyProc->heldLocks to reflect locks already held
843  * on the lockable object by this process (under all XIDs).
844  *
845  * The locktable's masterLock must be held at entry.
846  */
847 static int
848 WaitOnLock(LOCKMETHODID lockmethodid, LOCKMODE lockmode,
849                    LOCK *lock, PROCLOCK *proclock)
850 {
851         LockMethod      lockMethodTable = LockMethods[lockmethodid];
852         char       *new_status,
853                            *old_status;
854
855         Assert(lockmethodid < NumLockMethods);
856
857         LOCK_PRINT("WaitOnLock: sleeping on lock", lock, lockmode);
858
859         old_status = pstrdup(get_ps_display());
860         new_status = (char *) palloc(strlen(old_status) + 10);
861         strcpy(new_status, old_status);
862         strcat(new_status, " waiting");
863         set_ps_display(new_status);
864
865         /*
866          * NOTE: Think not to put any shared-state cleanup after the call to
867          * ProcSleep, in either the normal or failure path.  The lock state
868          * must be fully set by the lock grantor, or by CheckDeadLock if we
869          * give up waiting for the lock.  This is necessary because of the
870          * possibility that a cancel/die interrupt will interrupt ProcSleep
871          * after someone else grants us the lock, but before we've noticed it.
872          * Hence, after granting, the locktable state must fully reflect the
873          * fact that we own the lock; we can't do additional work on return.
874          * Contrariwise, if we fail, any cleanup must happen in xact abort
875          * processing, not here, to ensure it will also happen in the
876          * cancel/die case.
877          */
878
879         if (ProcSleep(lockMethodTable,
880                                   lockmode,
881                                   lock,
882                                   proclock) != STATUS_OK)
883         {
884                 /*
885                  * We failed as a result of a deadlock, see CheckDeadLock(). Quit
886                  * now.  Removal of the proclock and lock objects, if no longer
887                  * needed, will happen in xact cleanup (see above for motivation).
888                  */
889                 LOCK_PRINT("WaitOnLock: aborting on lock", lock, lockmode);
890                 LWLockRelease(lockMethodTable->masterLock);
891
892                 /*
893                  * Now that we aren't holding the LockMgrLock, we can give an
894                  * error report including details about the detected deadlock.
895                  */
896                 DeadLockReport();
897                 /* not reached */
898         }
899
900         set_ps_display(old_status);
901         pfree(old_status);
902         pfree(new_status);
903
904         LOCK_PRINT("WaitOnLock: wakeup on lock", lock, lockmode);
905         return STATUS_OK;
906 }
907
908 /*
909  * Remove a proc from the wait-queue it is on
910  * (caller must know it is on one).
911  *
912  * Locktable lock must be held by caller.
913  *
914  * NB: this does not remove the process' proclock object, nor the lock object,
915  * even though their counts might now have gone to zero.  That will happen
916  * during a subsequent LockReleaseAll call, which we expect will happen
917  * during transaction cleanup.  (Removal of a proc from its wait queue by
918  * this routine can only happen if we are aborting the transaction.)
919  */
920 void
921 RemoveFromWaitQueue(PGPROC *proc)
922 {
923         LOCK       *waitLock = proc->waitLock;
924         LOCKMODE        lockmode = proc->waitLockMode;
925
926         /* Make sure proc is waiting */
927         Assert(proc->links.next != INVALID_OFFSET);
928         Assert(waitLock);
929         Assert(waitLock->waitProcs.size > 0);
930
931         /* Remove proc from lock's wait queue */
932         SHMQueueDelete(&(proc->links));
933         waitLock->waitProcs.size--;
934
935         /* Undo increments of request counts by waiting process */
936         Assert(waitLock->nRequested > 0);
937         Assert(waitLock->nRequested > proc->waitLock->nGranted);
938         waitLock->nRequested--;
939         Assert(waitLock->requested[lockmode] > 0);
940         waitLock->requested[lockmode]--;
941         /* don't forget to clear waitMask bit if appropriate */
942         if (waitLock->granted[lockmode] == waitLock->requested[lockmode])
943                 waitLock->waitMask &= LOCKBIT_OFF(lockmode);
944
945         /* Clean up the proc's own state */
946         proc->waitLock = NULL;
947         proc->waitHolder = NULL;
948
949         /* See if any other waiters for the lock can be woken up now */
950         ProcLockWakeup(GetLocksMethodTable(waitLock), waitLock);
951 }
952
953 /*
954  * LockRelease -- look up 'locktag' in lock table 'lockmethodid' and
955  *              release one 'lockmode' lock on it.
956  *
957  * Side Effects: find any waiting processes that are now wakable,
958  *              grant them their requested locks and awaken them.
959  *              (We have to grant the lock here to avoid a race between
960  *              the waking process and any new process to
961  *              come along and request the lock.)
962  */
963 bool
964 LockRelease(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
965                         TransactionId xid, LOCKMODE lockmode)
966 {
967         LOCK       *lock;
968         LWLockId        masterLock;
969         LockMethod      lockMethodTable;
970         PROCLOCK   *proclock;
971         PROCLOCKTAG proclocktag;
972         HTAB       *proclockTable;
973         bool            wakeupNeeded = false;
974
975 #ifdef LOCK_DEBUG
976         if (lockmethodid == USER_LOCKMETHOD && Trace_userlocks)
977                 elog(LOG, "LockRelease: user lock tag [%u] %d", locktag->objId.blkno, lockmode);
978 #endif
979
980         /* ???????? This must be changed when short term locks will be used */
981         locktag->lockmethodid = lockmethodid;
982
983         /* Record release of the lock in the current resource owner */
984         ResourceOwnerForgetLock(CurrentResourceOwner, locktag, xid, lockmode);
985
986         Assert(lockmethodid < NumLockMethods);
987         lockMethodTable = LockMethods[lockmethodid];
988         if (!lockMethodTable)
989         {
990                 elog(WARNING, "lockMethodTable is null in LockRelease");
991                 return FALSE;
992         }
993
994         masterLock = lockMethodTable->masterLock;
995         LWLockAcquire(masterLock, LW_EXCLUSIVE);
996
997         /*
998          * Find a lock with this tag
999          */
1000         Assert(LockMethodLockHash[lockmethodid]->hash == tag_hash);
1001         lock = (LOCK *) hash_search(LockMethodLockHash[lockmethodid],
1002                                                                 (void *) locktag,
1003                                                                 HASH_FIND, NULL);
1004
1005         /*
1006          * let the caller print its own error message, too. Do not
1007          * ereport(ERROR).
1008          */
1009         if (!lock)
1010         {
1011                 LWLockRelease(masterLock);
1012                 elog(WARNING, "no such lock");
1013                 return FALSE;
1014         }
1015         LOCK_PRINT("LockRelease: found", lock, lockmode);
1016
1017         /*
1018          * Find the proclock entry for this proclock.
1019          */
1020         MemSet(&proclocktag, 0, sizeof(PROCLOCKTAG));           /* must clear padding,
1021                                                                                                                  * needed */
1022         proclocktag.lock = MAKE_OFFSET(lock);
1023         proclocktag.proc = MAKE_OFFSET(MyProc);
1024         TransactionIdStore(xid, &proclocktag.xid);
1025
1026         proclockTable = LockMethodProcLockHash[lockmethodid];
1027         proclock = (PROCLOCK *) hash_search(proclockTable,
1028                                                                                 (void *) &proclocktag,
1029                                                                                 HASH_FIND_SAVE, NULL);
1030         if (!proclock)
1031         {
1032                 LWLockRelease(masterLock);
1033 #ifdef USER_LOCKS
1034                 if (lockmethodid == USER_LOCKMETHOD)
1035                         elog(WARNING, "no lock with this tag");
1036                 else
1037 #endif
1038                         elog(WARNING, "proclock table corrupted");
1039                 return FALSE;
1040         }
1041         PROCLOCK_PRINT("LockRelease: found", proclock);
1042
1043         /*
1044          * Check that we are actually holding a lock of the type we want to
1045          * release.
1046          */
1047         if (!(proclock->holding[lockmode] > 0))
1048         {
1049                 PROCLOCK_PRINT("LockRelease: WRONGTYPE", proclock);
1050                 Assert(proclock->holding[lockmode] >= 0);
1051                 LWLockRelease(masterLock);
1052                 elog(WARNING, "you don't own a lock of type %s",
1053                          lock_mode_names[lockmode]);
1054                 return FALSE;
1055         }
1056         Assert(proclock->nHolding > 0);
1057         Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
1058         Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
1059         Assert(lock->nGranted <= lock->nRequested);
1060
1061         /*
1062          * fix the general lock stats
1063          */
1064         lock->nRequested--;
1065         lock->requested[lockmode]--;
1066         lock->nGranted--;
1067         lock->granted[lockmode]--;
1068
1069         if (lock->granted[lockmode] == 0)
1070         {
1071                 /* change the conflict mask.  No more of this lock type. */
1072                 lock->grantMask &= LOCKBIT_OFF(lockmode);
1073         }
1074
1075         LOCK_PRINT("LockRelease: updated", lock, lockmode);
1076         Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
1077         Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
1078         Assert(lock->nGranted <= lock->nRequested);
1079
1080         /*
1081          * We need only run ProcLockWakeup if the released lock conflicts with
1082          * at least one of the lock types requested by waiter(s).  Otherwise
1083          * whatever conflict made them wait must still exist.  NOTE: before
1084          * MVCC, we could skip wakeup if lock->granted[lockmode] was still
1085          * positive. But that's not true anymore, because the remaining
1086          * granted locks might belong to some waiter, who could now be
1087          * awakened because he doesn't conflict with his own locks.
1088          */
1089         if (lockMethodTable->conflictTab[lockmode] & lock->waitMask)
1090                 wakeupNeeded = true;
1091
1092         if (lock->nRequested == 0)
1093         {
1094                 /*
1095                  * if there's no one waiting in the queue, we just released the
1096                  * last lock on this object. Delete it from the lock table.
1097                  */
1098                 Assert(LockMethodLockHash[lockmethodid]->hash == tag_hash);
1099                 lock = (LOCK *) hash_search(LockMethodLockHash[lockmethodid],
1100                                                                         (void *) &(lock->tag),
1101                                                                         HASH_REMOVE,
1102                                                                         NULL);
1103                 if (!lock)
1104                 {
1105                         LWLockRelease(masterLock);
1106                         elog(WARNING, "lock table corrupted");
1107                         return FALSE;
1108                 }
1109                 wakeupNeeded = false;   /* should be false, but make sure */
1110         }
1111
1112         /*
1113          * Now fix the per-proclock lock stats.
1114          */
1115         proclock->holding[lockmode]--;
1116         proclock->nHolding--;
1117         PROCLOCK_PRINT("LockRelease: updated", proclock);
1118         Assert((proclock->nHolding >= 0) && (proclock->holding[lockmode] >= 0));
1119
1120         /*
1121          * If this was my last hold on this lock, delete my entry in the
1122          * proclock table.
1123          */
1124         if (proclock->nHolding == 0)
1125         {
1126                 PROCLOCK_PRINT("LockRelease: deleting", proclock);
1127                 SHMQueueDelete(&proclock->lockLink);
1128                 SHMQueueDelete(&proclock->procLink);
1129                 proclock = (PROCLOCK *) hash_search(proclockTable,
1130                                                                                         (void *) &proclock,
1131                                                                                         HASH_REMOVE_SAVED, NULL);
1132                 if (!proclock)
1133                 {
1134                         LWLockRelease(masterLock);
1135                         elog(WARNING, "proclock table corrupted");
1136                         return FALSE;
1137                 }
1138         }
1139
1140         /*
1141          * Wake up waiters if needed.
1142          */
1143         if (wakeupNeeded)
1144                 ProcLockWakeup(lockMethodTable, lock);
1145
1146         LWLockRelease(masterLock);
1147         return TRUE;
1148 }
1149
1150 /*
1151  * LockReleaseAll -- Release all locks of the specified lock method that
1152  *              are held by the specified process.
1153  *
1154  * Well, not necessarily *all* locks.  The available behaviors are:
1155  *
1156  * allxids == true: release all locks regardless of transaction
1157  * affiliation.
1158  *
1159  * allxids == false: release all locks with Xid != 0
1160  * (zero is the Xid used for "session" locks).
1161  */
1162 bool
1163 LockReleaseAll(LOCKMETHODID lockmethodid, PGPROC *proc,
1164                            bool allxids)
1165 {
1166         SHM_QUEUE  *procHolders = &(proc->procHolders);
1167         PROCLOCK   *proclock;
1168         PROCLOCK   *nextHolder;
1169         LWLockId        masterLock;
1170         LockMethod      lockMethodTable;
1171         int                     i,
1172                                 numLockModes;
1173         LOCK       *lock;
1174
1175 #ifdef LOCK_DEBUG
1176         if (lockmethodid == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
1177                 elog(LOG, "LockReleaseAll: lockmethod=%d, pid=%d",
1178                          lockmethodid, proc->pid);
1179 #endif
1180
1181         Assert(lockmethodid < NumLockMethods);
1182         lockMethodTable = LockMethods[lockmethodid];
1183         if (!lockMethodTable)
1184         {
1185                 elog(WARNING, "bad lock method: %d", lockmethodid);
1186                 return FALSE;
1187         }
1188
1189         numLockModes = lockMethodTable->numLockModes;
1190         masterLock = lockMethodTable->masterLock;
1191
1192         LWLockAcquire(masterLock, LW_EXCLUSIVE);
1193
1194         proclock = (PROCLOCK *) SHMQueueNext(procHolders, procHolders,
1195                                                                                  offsetof(PROCLOCK, procLink));
1196
1197         while (proclock)
1198         {
1199                 bool            wakeupNeeded = false;
1200
1201                 /* Get link first, since we may unlink/delete this proclock */
1202                 nextHolder = (PROCLOCK *) SHMQueueNext(procHolders, &proclock->procLink,
1203                                                                                    offsetof(PROCLOCK, procLink));
1204
1205                 Assert(proclock->tag.proc == MAKE_OFFSET(proc));
1206
1207                 lock = (LOCK *) MAKE_PTR(proclock->tag.lock);
1208
1209                 /* Ignore items that are not of the lockmethod to be removed */
1210                 if (LOCK_LOCKMETHOD(*lock) != lockmethodid)
1211                         goto next_item;
1212
1213                 /* Ignore locks with Xid=0 unless we are asked to release all locks */
1214                 if (TransactionIdEquals(proclock->tag.xid, InvalidTransactionId)
1215                         && !allxids)
1216                         goto next_item;
1217
1218                 PROCLOCK_PRINT("LockReleaseAll", proclock);
1219                 LOCK_PRINT("LockReleaseAll", lock, 0);
1220                 Assert(lock->nRequested >= 0);
1221                 Assert(lock->nGranted >= 0);
1222                 Assert(lock->nGranted <= lock->nRequested);
1223                 Assert(proclock->nHolding >= 0);
1224                 Assert(proclock->nHolding <= lock->nRequested);
1225
1226                 /*
1227                  * fix the general lock stats
1228                  */
1229                 if (lock->nRequested != proclock->nHolding)
1230                 {
1231                         for (i = 1; i <= numLockModes; i++)
1232                         {
1233                                 Assert(proclock->holding[i] >= 0);
1234                                 if (proclock->holding[i] > 0)
1235                                 {
1236                                         lock->requested[i] -= proclock->holding[i];
1237                                         lock->granted[i] -= proclock->holding[i];
1238                                         Assert(lock->requested[i] >= 0 && lock->granted[i] >= 0);
1239                                         if (lock->granted[i] == 0)
1240                                                 lock->grantMask &= LOCKBIT_OFF(i);
1241
1242                                         /*
1243                                          * Read comments in LockRelease
1244                                          */
1245                                         if (!wakeupNeeded &&
1246                                                 lockMethodTable->conflictTab[i] & lock->waitMask)
1247                                                 wakeupNeeded = true;
1248                                 }
1249                         }
1250                         lock->nRequested -= proclock->nHolding;
1251                         lock->nGranted -= proclock->nHolding;
1252                         Assert((lock->nRequested >= 0) && (lock->nGranted >= 0));
1253                         Assert(lock->nGranted <= lock->nRequested);
1254                 }
1255                 else
1256                 {
1257                         /*
1258                          * This proclock accounts for all the requested locks on the
1259                          * object, so we can be lazy and just zero things out.
1260                          */
1261                         lock->nRequested = 0;
1262                         lock->nGranted = 0;
1263                         /* Fix the lock status, just for next LOCK_PRINT message. */
1264                         for (i = 1; i <= numLockModes; i++)
1265                         {
1266                                 Assert(lock->requested[i] == lock->granted[i]);
1267                                 lock->requested[i] = lock->granted[i] = 0;
1268                         }
1269                 }
1270                 LOCK_PRINT("LockReleaseAll: updated", lock, 0);
1271
1272                 PROCLOCK_PRINT("LockReleaseAll: deleting", proclock);
1273
1274                 /*
1275                  * Remove the proclock entry from the linked lists
1276                  */
1277                 SHMQueueDelete(&proclock->lockLink);
1278                 SHMQueueDelete(&proclock->procLink);
1279
1280                 /*
1281                  * remove the proclock entry from the hashtable
1282                  */
1283                 proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid],
1284                                                                                         (void *) proclock,
1285                                                                                         HASH_REMOVE,
1286                                                                                         NULL);
1287                 if (!proclock)
1288                 {
1289                         LWLockRelease(masterLock);
1290                         elog(WARNING, "proclock table corrupted");
1291                         return FALSE;
1292                 }
1293
1294                 if (lock->nRequested == 0)
1295                 {
1296                         /*
1297                          * We've just released the last lock, so garbage-collect the
1298                          * lock object.
1299                          */
1300                         LOCK_PRINT("LockReleaseAll: deleting", lock, 0);
1301                         Assert(LockMethodLockHash[lockmethodid]->hash == tag_hash);
1302                         lock = (LOCK *) hash_search(LockMethodLockHash[lockmethodid],
1303                                                                                 (void *) &(lock->tag),
1304                                                                                 HASH_REMOVE, NULL);
1305                         if (!lock)
1306                         {
1307                                 LWLockRelease(masterLock);
1308                                 elog(WARNING, "cannot remove lock from HTAB");
1309                                 return FALSE;
1310                         }
1311                 }
1312                 else if (wakeupNeeded)
1313                         ProcLockWakeup(lockMethodTable, lock);
1314
1315 next_item:
1316                 proclock = nextHolder;
1317         }
1318
1319         LWLockRelease(masterLock);
1320
1321 #ifdef LOCK_DEBUG
1322         if (lockmethodid == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
1323                 elog(LOG, "LockReleaseAll done");
1324 #endif
1325
1326         return TRUE;
1327 }
1328
1329 int
1330 LockShmemSize(int maxBackends)
1331 {
1332         int                     size = 0;
1333         long            max_table_size = NLOCKENTS(maxBackends);
1334
1335         size += MAXALIGN(sizeof(PROC_HDR)); /* ProcGlobal */
1336         size += maxBackends * MAXALIGN(sizeof(PGPROC));         /* each MyProc */
1337         size += MAX_LOCK_METHODS * MAXALIGN(sizeof(LockMethodData));            /* each lock method */
1338
1339         /* lockHash table */
1340         size += hash_estimate_size(max_table_size, sizeof(LOCK));
1341
1342         /* proclockHash table */
1343         size += hash_estimate_size(max_table_size, sizeof(PROCLOCK));
1344
1345         /*
1346          * Since the lockHash entry count above is only an estimate, add 10%
1347          * safety margin.
1348          */
1349         size += size / 10;
1350
1351         return size;
1352 }
1353
1354 /*
1355  * GetLockStatusData - Return a summary of the lock manager's internal
1356  * status, for use in a user-level reporting function.
1357  *
1358  * The return data consists of an array of PROCLOCK objects, with the
1359  * associated PGPROC and LOCK objects for each.  Note that multiple
1360  * copies of the same PGPROC and/or LOCK objects are likely to appear.
1361  * It is the caller's responsibility to match up duplicates if wanted.
1362  *
1363  * The design goal is to hold the LockMgrLock for as short a time as possible;
1364  * thus, this function simply makes a copy of the necessary data and releases
1365  * the lock, allowing the caller to contemplate and format the data for as
1366  * long as it pleases.
1367  */
1368 LockData *
1369 GetLockStatusData(void)
1370 {
1371         LockData   *data;
1372         HTAB       *proclockTable;
1373         PROCLOCK   *proclock;
1374         HASH_SEQ_STATUS seqstat;
1375         int                     i;
1376
1377         data = (LockData *) palloc(sizeof(LockData));
1378
1379         LWLockAcquire(LockMgrLock, LW_EXCLUSIVE);
1380
1381         proclockTable = LockMethodProcLockHash[DEFAULT_LOCKMETHOD];
1382
1383         data->nelements = i = proclockTable->hctl->nentries;
1384
1385         data->proclockaddrs = (SHMEM_OFFSET *) palloc(sizeof(SHMEM_OFFSET) * i);
1386         data->proclocks = (PROCLOCK *) palloc(sizeof(PROCLOCK) * i);
1387         data->procs = (PGPROC *) palloc(sizeof(PGPROC) * i);
1388         data->locks = (LOCK *) palloc(sizeof(LOCK) * i);
1389
1390         hash_seq_init(&seqstat, proclockTable);
1391
1392         i = 0;
1393         while ((proclock = hash_seq_search(&seqstat)))
1394         {
1395                 PGPROC     *proc = (PGPROC *) MAKE_PTR(proclock->tag.proc);
1396                 LOCK       *lock = (LOCK *) MAKE_PTR(proclock->tag.lock);
1397
1398                 data->proclockaddrs[i] = MAKE_OFFSET(proclock);
1399                 memcpy(&(data->proclocks[i]), proclock, sizeof(PROCLOCK));
1400                 memcpy(&(data->procs[i]), proc, sizeof(PGPROC));
1401                 memcpy(&(data->locks[i]), lock, sizeof(LOCK));
1402
1403                 i++;
1404         }
1405
1406         LWLockRelease(LockMgrLock);
1407
1408         Assert(i == data->nelements);
1409
1410         return data;
1411 }
1412
1413 /* Provide the textual name of any lock mode */
1414 const char *
1415 GetLockmodeName(LOCKMODE mode)
1416 {
1417         Assert(mode <= MAX_LOCKMODES);
1418         return lock_mode_names[mode];
1419 }
1420
1421 #ifdef LOCK_DEBUG
1422 /*
1423  * Dump all locks in the proc->procHolders list.
1424  *
1425  * Must have already acquired the masterLock.
1426  */
1427 void
1428 DumpLocks(void)
1429 {
1430         PGPROC     *proc;
1431         SHM_QUEUE  *procHolders;
1432         PROCLOCK   *proclock;
1433         LOCK       *lock;
1434         int                     lockmethodid = DEFAULT_LOCKMETHOD;
1435         LockMethod      lockMethodTable;
1436
1437         proc = MyProc;
1438         if (proc == NULL)
1439                 return;
1440
1441         procHolders = &proc->procHolders;
1442
1443         Assert(lockmethodid < NumLockMethods);
1444         lockMethodTable = LockMethods[lockmethodid];
1445         if (!lockMethodTable)
1446                 return;
1447
1448         if (proc->waitLock)
1449                 LOCK_PRINT("DumpLocks: waiting on", proc->waitLock, 0);
1450
1451         proclock = (PROCLOCK *) SHMQueueNext(procHolders, procHolders,
1452                                                                                  offsetof(PROCLOCK, procLink));
1453
1454         while (proclock)
1455         {
1456                 Assert(proclock->tag.proc == MAKE_OFFSET(proc));
1457
1458                 lock = (LOCK *) MAKE_PTR(proclock->tag.lock);
1459
1460                 PROCLOCK_PRINT("DumpLocks", proclock);
1461                 LOCK_PRINT("DumpLocks", lock, 0);
1462
1463                 proclock = (PROCLOCK *) SHMQueueNext(procHolders, &proclock->procLink,
1464                                                                                    offsetof(PROCLOCK, procLink));
1465         }
1466 }
1467
1468 /*
1469  * Dump all postgres locks. Must have already acquired the masterLock.
1470  */
1471 void
1472 DumpAllLocks(void)
1473 {
1474         PGPROC     *proc;
1475         PROCLOCK   *proclock;
1476         LOCK       *lock;
1477         int                     lockmethodid = DEFAULT_LOCKMETHOD;
1478         LockMethod      lockMethodTable;
1479         HTAB       *proclockTable;
1480         HASH_SEQ_STATUS status;
1481
1482         proc = MyProc;
1483         if (proc == NULL)
1484                 return;
1485
1486         Assert(lockmethodid < NumLockMethods);
1487         lockMethodTable = LockMethods[lockmethodid];
1488         if (!lockMethodTable)
1489                 return;
1490
1491         proclockTable = LockMethodProcLockHash[lockmethodid];
1492
1493         if (proc->waitLock)
1494                 LOCK_PRINT("DumpAllLocks: waiting on", proc->waitLock, 0);
1495
1496         hash_seq_init(&status, proclockTable);
1497         while ((proclock = (PROCLOCK *) hash_seq_search(&status)) != NULL)
1498         {
1499                 PROCLOCK_PRINT("DumpAllLocks", proclock);
1500
1501                 if (proclock->tag.lock)
1502                 {
1503                         lock = (LOCK *) MAKE_PTR(proclock->tag.lock);
1504                         LOCK_PRINT("DumpAllLocks", lock, 0);
1505                 }
1506                 else
1507                         elog(LOG, "DumpAllLocks: proclock->tag.lock = NULL");
1508         }
1509 }
1510
1511 #endif   /* LOCK_DEBUG */