]> granicus.if.org Git - postgresql/blob - src/backend/storage/lmgr/lock.c
Restructure LOCKTAG as per discussions of a couple months ago.
[postgresql] / src / backend / storage / lmgr / lock.c
1 /*-------------------------------------------------------------------------
2  *
3  * lock.c
4  *        POSTGRES low-level lock mechanism
5  *
6  * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $PostgreSQL: pgsql/src/backend/storage/lmgr/lock.c,v 1.150 2005/04/29 22:28:24 tgl Exp $
12  *
13  * NOTES
14  *        Outside modules can create a lock table and acquire/release
15  *        locks.  A lock table is a shared memory hash table.  When
16  *        a process tries to acquire a lock of a type that conflicts
17  *        with existing locks, it is put to sleep using the routines
18  *        in storage/lmgr/proc.c.
19  *
20  *        For the most part, this code should be invoked via lmgr.c
21  *        or another lock-management module, not directly.
22  *
23  *      Interface:
24  *
25  *      LockAcquire(), LockRelease(), LockMethodTableInit(),
26  *      LockMethodTableRename(), LockReleaseAll(),
27  *      LockCheckConflicts(), GrantLock()
28  *
29  *-------------------------------------------------------------------------
30  */
31 #include "postgres.h"
32
33 #include <signal.h>
34 #include <unistd.h>
35
36 #include "access/xact.h"
37 #include "miscadmin.h"
38 #include "storage/proc.h"
39 #include "utils/memutils.h"
40 #include "utils/ps_status.h"
41 #include "utils/resowner.h"
42
43
44 /* This configuration variable is used to set the lock table size */
45 int                     max_locks_per_xact; /* set by guc.c */
46
47 #define NLOCKENTS(maxBackends)  (max_locks_per_xact * (maxBackends))
48
49
50 /*
51  * map from lock method id to the lock table data structures
52  */
53 static LockMethod LockMethods[MAX_LOCK_METHODS];
54 static HTAB *LockMethodLockHash[MAX_LOCK_METHODS];
55 static HTAB *LockMethodProcLockHash[MAX_LOCK_METHODS];
56 static HTAB *LockMethodLocalHash[MAX_LOCK_METHODS];
57
58 /* exported so lmgr.c can initialize it */
59 int                     NumLockMethods;
60
61
62 /* private state for GrantAwaitedLock */
63 static LOCALLOCK *awaitedLock;
64 static ResourceOwner awaitedOwner;
65
66
67 static const char *const lock_mode_names[] =
68 {
69         "INVALID",
70         "AccessShareLock",
71         "RowShareLock",
72         "RowExclusiveLock",
73         "ShareUpdateExclusiveLock",
74         "ShareLock",
75         "ShareRowExclusiveLock",
76         "ExclusiveLock",
77         "AccessExclusiveLock"
78 };
79
80
81 #ifdef LOCK_DEBUG
82
83 /*------
84  * The following configuration options are available for lock debugging:
85  *
86  *         TRACE_LOCKS          -- give a bunch of output what's going on in this file
87  *         TRACE_USERLOCKS      -- same but for user locks
88  *         TRACE_LOCK_OIDMIN-- do not trace locks for tables below this oid
89  *                                                 (use to avoid output on system tables)
90  *         TRACE_LOCK_TABLE -- trace locks on this table (oid) unconditionally
91  *         DEBUG_DEADLOCKS      -- currently dumps locks at untimely occasions ;)
92  *
93  * Furthermore, but in storage/lmgr/lwlock.c:
94  *         TRACE_LWLOCKS        -- trace lightweight locks (pretty useless)
95  *
96  * Define LOCK_DEBUG at compile time to get all these enabled.
97  * --------
98  */
99
100 int                     Trace_lock_oidmin = FirstNormalObjectId;
101 bool            Trace_locks = false;
102 bool            Trace_userlocks = false;
103 int                     Trace_lock_table = 0;
104 bool            Debug_deadlocks = false;
105
106
107 inline static bool
108 LOCK_DEBUG_ENABLED(const LOCK *lock)
109 {
110         return
111                 (((Trace_locks && LOCK_LOCKMETHOD(*lock) == DEFAULT_LOCKMETHOD)
112                   || (Trace_userlocks && LOCK_LOCKMETHOD(*lock) == USER_LOCKMETHOD))
113                  && ((Oid) lock->tag.locktag_field2 >= (Oid) Trace_lock_oidmin))
114                 || (Trace_lock_table
115                         && (lock->tag.locktag_field2 == Trace_lock_table));
116 }
117
118
119 inline static void
120 LOCK_PRINT(const char *where, const LOCK *lock, LOCKMODE type)
121 {
122         if (LOCK_DEBUG_ENABLED(lock))
123                 elog(LOG,
124                          "%s: lock(%lx) id(%u,%u,%u,%u,%u,%u) grantMask(%x) "
125                          "req(%d,%d,%d,%d,%d,%d,%d)=%d "
126                          "grant(%d,%d,%d,%d,%d,%d,%d)=%d wait(%d) type(%s)",
127                          where, MAKE_OFFSET(lock),
128                          lock->tag.locktag_field1, lock->tag.locktag_field2,
129                          lock->tag.locktag_field3, lock->tag.locktag_field4,
130                          lock->tag.locktag_type, lock->tag.locktag_lockmethodid,
131                          lock->grantMask,
132                          lock->requested[1], lock->requested[2], lock->requested[3],
133                          lock->requested[4], lock->requested[5], lock->requested[6],
134                          lock->requested[7], lock->nRequested,
135                          lock->granted[1], lock->granted[2], lock->granted[3],
136                          lock->granted[4], lock->granted[5], lock->granted[6],
137                          lock->granted[7], lock->nGranted,
138                          lock->waitProcs.size, lock_mode_names[type]);
139 }
140
141
142 inline static void
143 PROCLOCK_PRINT(const char *where, const PROCLOCK *proclockP)
144 {
145         if (LOCK_DEBUG_ENABLED((LOCK *) MAKE_PTR(proclockP->tag.lock)))
146                 elog(LOG,
147                 "%s: proclock(%lx) lock(%lx) method(%u) proc(%lx) xid(%u) hold(%x)",
148                          where, MAKE_OFFSET(proclockP), proclockP->tag.lock,
149                          PROCLOCK_LOCKMETHOD(*(proclockP)),
150                          proclockP->tag.proc, proclockP->tag.xid,
151                          (int) proclockP->holdMask);
152 }
153
154 #else                                                   /* not LOCK_DEBUG */
155
156 #define LOCK_PRINT(where, lock, type)
157 #define PROCLOCK_PRINT(where, proclockP)
158 #endif   /* not LOCK_DEBUG */
159
160
161 static void RemoveLocalLock(LOCALLOCK *locallock);
162 static void GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner);
163 static int WaitOnLock(LOCKMETHODID lockmethodid, LOCALLOCK *locallock,
164                    ResourceOwner owner);
165 static void LockCountMyLocks(SHMEM_OFFSET lockOffset, PGPROC *proc,
166                                  int *myHolding);
167 static bool UnGrantLock(LOCK *lock, LOCKMODE lockmode,
168                                                 PROCLOCK *proclock, LockMethod lockMethodTable);
169
170
171 /*
172  * InitLocks -- Init the lock module.  Create a private data
173  *              structure for constructing conflict masks.
174  */
175 void
176 InitLocks(void)
177 {
178         /* NOP */
179 }
180
181
182 /*
183  * Fetch the lock method table associated with a given lock
184  */
185 LockMethod
186 GetLocksMethodTable(LOCK *lock)
187 {
188         LOCKMETHODID lockmethodid = LOCK_LOCKMETHOD(*lock);
189
190         Assert(0 < lockmethodid && lockmethodid < NumLockMethods);
191         return LockMethods[lockmethodid];
192 }
193
194
195 /*
196  * LockMethodInit -- initialize the lock table's lock type
197  *              structures
198  *
199  * Notes: just copying.  Should only be called once.
200  */
201 static void
202 LockMethodInit(LockMethod lockMethodTable,
203                            const LOCKMASK *conflictsP,
204                            int numModes)
205 {
206         int                     i;
207
208         lockMethodTable->numLockModes = numModes;
209         /* copies useless zero element as well as the N lockmodes */
210         for (i = 0; i <= numModes; i++)
211                 lockMethodTable->conflictTab[i] = conflictsP[i];
212 }
213
214 /*
215  * LockMethodTableInit -- initialize a lock table structure
216  *
217  * NOTE: data structures allocated here are allocated permanently, using
218  * TopMemoryContext and shared memory.  We don't ever release them anyway,
219  * and in normal multi-backend operation the lock table structures set up
220  * by the postmaster are inherited by each backend, so they must be in
221  * TopMemoryContext.
222  */
223 LOCKMETHODID
224 LockMethodTableInit(const char *tabName,
225                                         const LOCKMASK *conflictsP,
226                                         int numModes,
227                                         int maxBackends)
228 {
229         LockMethod      newLockMethod;
230         LOCKMETHODID lockmethodid;
231         char       *shmemName;
232         HASHCTL         info;
233         int                     hash_flags;
234         bool            found;
235         long            init_table_size,
236                                 max_table_size;
237
238         if (numModes >= MAX_LOCKMODES)
239                 elog(ERROR, "too many lock types %d (limit is %d)",
240                          numModes, MAX_LOCKMODES - 1);
241
242         /* Compute init/max size to request for lock hashtables */
243         max_table_size = NLOCKENTS(maxBackends);
244         init_table_size = max_table_size / 2;
245
246         /* Allocate a string for the shmem index table lookups. */
247         /* This is just temp space in this routine, so palloc is OK. */
248         shmemName = (char *) palloc(strlen(tabName) + 32);
249
250         /* each lock table has a header in shared memory */
251         sprintf(shmemName, "%s (lock method table)", tabName);
252         newLockMethod = (LockMethod)
253                 ShmemInitStruct(shmemName, sizeof(LockMethodData), &found);
254
255         if (!newLockMethod)
256                 elog(FATAL, "could not initialize lock table \"%s\"", tabName);
257
258         /*
259          * we're first - initialize
260          */
261         if (!found)
262         {
263                 MemSet(newLockMethod, 0, sizeof(LockMethodData));
264                 newLockMethod->masterLock = LockMgrLock;
265                 LockMethodInit(newLockMethod, conflictsP, numModes);
266         }
267
268         /*
269          * other modules refer to the lock table by a lockmethod ID
270          */
271         Assert(NumLockMethods < MAX_LOCK_METHODS);
272         lockmethodid = NumLockMethods++;
273         LockMethods[lockmethodid] = newLockMethod;
274
275         /*
276          * allocate a hash table for LOCK structs.      This is used to store
277          * per-locked-object information.
278          */
279         MemSet(&info, 0, sizeof(info));
280         info.keysize = sizeof(LOCKTAG);
281         info.entrysize = sizeof(LOCK);
282         info.hash = tag_hash;
283         hash_flags = (HASH_ELEM | HASH_FUNCTION);
284
285         sprintf(shmemName, "%s (lock hash)", tabName);
286         LockMethodLockHash[lockmethodid] = ShmemInitHash(shmemName,
287                                                                                                          init_table_size,
288                                                                                                          max_table_size,
289                                                                                                          &info,
290                                                                                                          hash_flags);
291
292         if (!LockMethodLockHash[lockmethodid])
293                 elog(FATAL, "could not initialize lock table \"%s\"", tabName);
294
295         /*
296          * allocate a hash table for PROCLOCK structs.  This is used to store
297          * per-lock-holder information.
298          */
299         info.keysize = sizeof(PROCLOCKTAG);
300         info.entrysize = sizeof(PROCLOCK);
301         info.hash = tag_hash;
302         hash_flags = (HASH_ELEM | HASH_FUNCTION);
303
304         sprintf(shmemName, "%s (proclock hash)", tabName);
305         LockMethodProcLockHash[lockmethodid] = ShmemInitHash(shmemName,
306                                                                                                                  init_table_size,
307                                                                                                                  max_table_size,
308                                                                                                                  &info,
309                                                                                                                  hash_flags);
310
311         if (!LockMethodProcLockHash[lockmethodid])
312                 elog(FATAL, "could not initialize lock table \"%s\"", tabName);
313
314         /*
315          * allocate a non-shared hash table for LOCALLOCK structs.      This is
316          * used to store lock counts and resource owner information.
317          *
318          * The non-shared table could already exist in this process (this occurs
319          * when the postmaster is recreating shared memory after a backend
320          * crash). If so, delete and recreate it.  (We could simply leave it,
321          * since it ought to be empty in the postmaster, but for safety let's
322          * zap it.)
323          */
324         if (LockMethodLocalHash[lockmethodid])
325                 hash_destroy(LockMethodLocalHash[lockmethodid]);
326
327         info.keysize = sizeof(LOCALLOCKTAG);
328         info.entrysize = sizeof(LOCALLOCK);
329         info.hash = tag_hash;
330         hash_flags = (HASH_ELEM | HASH_FUNCTION);
331
332         sprintf(shmemName, "%s (locallock hash)", tabName);
333         LockMethodLocalHash[lockmethodid] = hash_create(shmemName,
334                                                                                                         128,
335                                                                                                         &info,
336                                                                                                         hash_flags);
337
338         pfree(shmemName);
339
340         return lockmethodid;
341 }
342
343 /*
344  * LockMethodTableRename -- allocate another lockmethod ID to the same
345  *              lock table.
346  *
347  * NOTES: This function makes it possible to have different lockmethodids,
348  *              and hence different locking semantics, while still storing all
349  *              the data in one shared-memory hashtable.
350  */
351
352 LOCKMETHODID
353 LockMethodTableRename(LOCKMETHODID lockmethodid)
354 {
355         LOCKMETHODID newLockMethodId;
356
357         if (NumLockMethods >= MAX_LOCK_METHODS)
358                 return INVALID_LOCKMETHOD;
359         if (LockMethods[lockmethodid] == INVALID_LOCKMETHOD)
360                 return INVALID_LOCKMETHOD;
361
362         /* other modules refer to the lock table by a lockmethod ID */
363         newLockMethodId = NumLockMethods;
364         NumLockMethods++;
365
366         LockMethods[newLockMethodId] = LockMethods[lockmethodid];
367         LockMethodLockHash[newLockMethodId] = LockMethodLockHash[lockmethodid];
368         LockMethodProcLockHash[newLockMethodId] = LockMethodProcLockHash[lockmethodid];
369         LockMethodLocalHash[newLockMethodId] = LockMethodLocalHash[lockmethodid];
370
371         return newLockMethodId;
372 }
373
374 /*
375  * LockAcquire -- Check for lock conflicts, sleep if conflict found,
376  *              set lock if/when no conflicts.
377  *
378  * Returns: TRUE if lock was acquired, FALSE otherwise.  Note that
379  *              a FALSE return is to be expected if dontWait is TRUE;
380  *              but if dontWait is FALSE, only a parameter error can cause
381  *              a FALSE return.  (XXX probably we should just ereport on parameter
382  *              errors, instead of conflating this with failure to acquire lock?)
383  *
384  * Side Effects: The lock is acquired and recorded in lock tables.
385  *
386  * NOTE: if we wait for the lock, there is no way to abort the wait
387  * short of aborting the transaction.
388  *
389  *
390  * Note on User Locks:
391  *
392  *              User locks are handled totally on the application side as
393  *              long term cooperative locks which extend beyond the normal
394  *              transaction boundaries.  Their purpose is to indicate to an
395  *              application that someone is `working' on an item.  So it is
396  *              possible to put an user lock on a tuple's oid, retrieve the
397  *              tuple, work on it for an hour and then update it and remove
398  *              the lock.  While the lock is active other clients can still
399  *              read and write the tuple but they can be aware that it has
400  *              been locked at the application level by someone.
401  *
402  *              User locks and normal locks are completely orthogonal and
403  *              they don't interfere with each other.
404  *
405  *              User locks are always non blocking, therefore they are never
406  *              acquired if already held by another process.  They must be
407  *              released explicitly by the application but they are released
408  *              automatically when a backend terminates.
409  *              They are indicated by a lockmethod 2 which is an alias for the
410  *              normal lock table.
411  *
412  *              The lockmode parameter can have the same values for normal locks
413  *              although probably only WRITE_LOCK can have some practical use.
414  *
415  *                                                                                                              DZ - 22 Nov 1997
416  */
417
418 bool
419 LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
420                         TransactionId xid, LOCKMODE lockmode, bool dontWait)
421 {
422         LOCALLOCKTAG localtag;
423         LOCALLOCK  *locallock;
424         LOCK       *lock;
425         PROCLOCK   *proclock;
426         PROCLOCKTAG proclocktag;
427         bool            found;
428         ResourceOwner owner;
429         LWLockId        masterLock;
430         LockMethod      lockMethodTable;
431         int                     status;
432         int                     myHolding[MAX_LOCKMODES];
433         int                     i;
434
435 #ifdef LOCK_DEBUG
436         if (Trace_userlocks && lockmethodid == USER_LOCKMETHOD)
437                 elog(LOG, "LockAcquire: user lock [%u,%u] %s",
438                          locktag->locktag_field1, locktag->locktag_field2,
439                          lock_mode_names[lockmode]);
440 #endif
441
442         /* ugly */
443         locktag->locktag_lockmethodid = lockmethodid;
444
445         Assert(lockmethodid < NumLockMethods);
446         lockMethodTable = LockMethods[lockmethodid];
447         if (!lockMethodTable)
448         {
449                 elog(WARNING, "bad lock table id: %d", lockmethodid);
450                 return FALSE;
451         }
452
453         /* Session locks and user locks are not transactional */
454         if (xid != InvalidTransactionId &&
455                 lockmethodid == DEFAULT_LOCKMETHOD)
456                 owner = CurrentResourceOwner;
457         else
458                 owner = NULL;
459
460         /*
461          * Find or create a LOCALLOCK entry for this lock and lockmode
462          */
463         MemSet(&localtag, 0, sizeof(localtag));         /* must clear padding */
464         localtag.lock = *locktag;
465         localtag.xid = xid;
466         localtag.mode = lockmode;
467
468         locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash[lockmethodid],
469                                                                                   (void *) &localtag,
470                                                                                   HASH_ENTER, &found);
471         if (!locallock)
472                 ereport(ERROR,
473                                 (errcode(ERRCODE_OUT_OF_MEMORY),
474                                  errmsg("out of memory")));
475
476         /*
477          * if it's a new locallock object, initialize it
478          */
479         if (!found)
480         {
481                 locallock->lock = NULL;
482                 locallock->proclock = NULL;
483                 locallock->nLocks = 0;
484                 locallock->numLockOwners = 0;
485                 locallock->maxLockOwners = 8;
486                 locallock->lockOwners = NULL;
487                 locallock->lockOwners = (LOCALLOCKOWNER *)
488                         MemoryContextAlloc(TopMemoryContext,
489                                           locallock->maxLockOwners * sizeof(LOCALLOCKOWNER));
490         }
491         else
492         {
493                 /* Make sure there will be room to remember the lock */
494                 if (locallock->numLockOwners >= locallock->maxLockOwners)
495                 {
496                         int                     newsize = locallock->maxLockOwners * 2;
497
498                         locallock->lockOwners = (LOCALLOCKOWNER *)
499                                 repalloc(locallock->lockOwners,
500                                                  newsize * sizeof(LOCALLOCKOWNER));
501                         locallock->maxLockOwners = newsize;
502                 }
503         }
504
505         /*
506          * If we already hold the lock, we can just increase the count
507          * locally.
508          */
509         if (locallock->nLocks > 0)
510         {
511                 GrantLockLocal(locallock, owner);
512                 return TRUE;
513         }
514
515         /*
516          * Otherwise we've got to mess with the shared lock table.
517          */
518         masterLock = lockMethodTable->masterLock;
519
520         LWLockAcquire(masterLock, LW_EXCLUSIVE);
521
522         /*
523          * Find or create a lock with this tag.
524          *
525          * Note: if the locallock object already existed, it might have a pointer
526          * to the lock already ... but we probably should not assume that that
527          * pointer is valid, since a lock object with no locks can go away
528          * anytime.
529          */
530         lock = (LOCK *) hash_search(LockMethodLockHash[lockmethodid],
531                                                                 (void *) locktag,
532                                                                 HASH_ENTER, &found);
533         if (!lock)
534         {
535                 LWLockRelease(masterLock);
536                 ereport(ERROR,
537                                 (errcode(ERRCODE_OUT_OF_MEMORY),
538                                  errmsg("out of shared memory"),
539                 errhint("You may need to increase max_locks_per_transaction.")));
540         }
541         locallock->lock = lock;
542
543         /*
544          * if it's a new lock object, initialize it
545          */
546         if (!found)
547         {
548                 lock->grantMask = 0;
549                 lock->waitMask = 0;
550                 SHMQueueInit(&(lock->procLocks));
551                 ProcQueueInit(&(lock->waitProcs));
552                 lock->nRequested = 0;
553                 lock->nGranted = 0;
554                 MemSet((char *) lock->requested, 0, sizeof(int) * MAX_LOCKMODES);
555                 MemSet((char *) lock->granted, 0, sizeof(int) * MAX_LOCKMODES);
556                 LOCK_PRINT("LockAcquire: new", lock, lockmode);
557         }
558         else
559         {
560                 LOCK_PRINT("LockAcquire: found", lock, lockmode);
561                 Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
562                 Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
563                 Assert(lock->nGranted <= lock->nRequested);
564         }
565
566         /*
567          * Create the hash key for the proclock table.
568          */
569         MemSet(&proclocktag, 0, sizeof(PROCLOCKTAG));           /* must clear padding */
570         proclocktag.lock = MAKE_OFFSET(lock);
571         proclocktag.proc = MAKE_OFFSET(MyProc);
572         TransactionIdStore(xid, &proclocktag.xid);
573
574         /*
575          * Find or create a proclock entry with this tag
576          */
577         proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid],
578                                                                                 (void *) &proclocktag,
579                                                                                 HASH_ENTER, &found);
580         if (!proclock)
581         {
582                 /* Ooops, not enough shmem for the proclock */
583                 if (lock->nRequested == 0)
584                 {
585                         /*
586                          * There are no other requestors of this lock, so garbage-collect
587                          * the lock object.  We *must* do this to avoid a permanent leak
588                          * of shared memory, because there won't be anything to cause
589                          * anyone to release the lock object later.
590                          */
591                         Assert(SHMQueueEmpty(&(lock->procLocks)));
592                         lock = (LOCK *) hash_search(LockMethodLockHash[lockmethodid],
593                                                                                 (void *) &(lock->tag),
594                                                                                 HASH_REMOVE, NULL);
595                 }
596                 LWLockRelease(masterLock);
597                 if (!lock)                              /* hash remove failed? */
598                         elog(WARNING, "lock table corrupted");
599                 ereport(ERROR,
600                                 (errcode(ERRCODE_OUT_OF_MEMORY),
601                                  errmsg("out of shared memory"),
602                 errhint("You may need to increase max_locks_per_transaction.")));
603         }
604         locallock->proclock = proclock;
605
606         /*
607          * If new, initialize the new entry
608          */
609         if (!found)
610         {
611                 proclock->holdMask = 0;
612                 /* Add proclock to appropriate lists */
613                 SHMQueueInsertBefore(&lock->procLocks, &proclock->lockLink);
614                 SHMQueueInsertBefore(&MyProc->procLocks, &proclock->procLink);
615                 PROCLOCK_PRINT("LockAcquire: new", proclock);
616         }
617         else
618         {
619                 PROCLOCK_PRINT("LockAcquire: found", proclock);
620                 Assert((proclock->holdMask & ~lock->grantMask) == 0);
621
622 #ifdef CHECK_DEADLOCK_RISK
623
624                 /*
625                  * Issue warning if we already hold a lower-level lock on this
626                  * object and do not hold a lock of the requested level or higher.
627                  * This indicates a deadlock-prone coding practice (eg, we'd have
628                  * a deadlock if another backend were following the same code path
629                  * at about the same time).
630                  *
631                  * This is not enabled by default, because it may generate log
632                  * entries about user-level coding practices that are in fact safe
633                  * in context. It can be enabled to help find system-level
634                  * problems.
635                  *
636                  * XXX Doing numeric comparison on the lockmodes is a hack; it'd be
637                  * better to use a table.  For now, though, this works.
638                  */
639                 for (i = lockMethodTable->numLockModes; i > 0; i--)
640                 {
641                         if (proclock->holdMask & LOCKBIT_ON(i))
642                         {
643                                 if (i >= (int) lockmode)
644                                         break;          /* safe: we have a lock >= req level */
645                                 elog(LOG, "deadlock risk: raising lock level"
646                                          " from %s to %s on object %u/%u/%u",
647                                          lock_mode_names[i], lock_mode_names[lockmode],
648                                  lock->tag.relId, lock->tag.dbId, lock->tag.objId.blkno);
649                                 break;
650                         }
651                 }
652 #endif   /* CHECK_DEADLOCK_RISK */
653         }
654
655         /*
656          * lock->nRequested and lock->requested[] count the total number of
657          * requests, whether granted or waiting, so increment those
658          * immediately. The other counts don't increment till we get the lock.
659          */
660         lock->nRequested++;
661         lock->requested[lockmode]++;
662         Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
663
664         /*
665          * If this process (under any XID) is a holder of the lock, just grant
666          * myself another one without blocking.
667          */
668         LockCountMyLocks(proclock->tag.lock, MyProc, myHolding);
669         if (myHolding[lockmode] > 0)
670         {
671                 GrantLock(lock, proclock, lockmode);
672                 GrantLockLocal(locallock, owner);
673                 PROCLOCK_PRINT("LockAcquire: my other XID owning", proclock);
674                 LWLockRelease(masterLock);
675                 return TRUE;
676         }
677
678         /*
679          * If lock requested conflicts with locks requested by waiters, must
680          * join wait queue.  Otherwise, check for conflict with already-held
681          * locks.  (That's last because most complex check.)
682          */
683         if (lockMethodTable->conflictTab[lockmode] & lock->waitMask)
684                 status = STATUS_FOUND;
685         else
686                 status = LockCheckConflicts(lockMethodTable, lockmode,
687                                                                         lock, proclock,
688                                                                         MyProc, myHolding);
689
690         if (status == STATUS_OK)
691         {
692                 /* No conflict with held or previously requested locks */
693                 GrantLock(lock, proclock, lockmode);
694                 GrantLockLocal(locallock, owner);
695         }
696         else
697         {
698                 Assert(status == STATUS_FOUND);
699
700                 /*
701                  * We can't acquire the lock immediately.  If caller specified no
702                  * blocking, remove useless table entries and return FALSE without
703                  * waiting.
704                  */
705                 if (dontWait)
706                 {
707                         if (proclock->holdMask == 0)
708                         {
709                                 SHMQueueDelete(&proclock->lockLink);
710                                 SHMQueueDelete(&proclock->procLink);
711                                 proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid],
712                                                                                            (void *) &(proclock->tag),
713                                                                                                         HASH_REMOVE, NULL);
714                                 if (!proclock)
715                                         elog(WARNING, "proclock table corrupted");
716                         }
717                         else
718                                 PROCLOCK_PRINT("LockAcquire: NOWAIT", proclock);
719                         lock->nRequested--;
720                         lock->requested[lockmode]--;
721                         LOCK_PRINT("LockAcquire: conditional lock failed", lock, lockmode);
722                         Assert((lock->nRequested > 0) && (lock->requested[lockmode] >= 0));
723                         Assert(lock->nGranted <= lock->nRequested);
724                         LWLockRelease(masterLock);
725                         if (locallock->nLocks == 0)
726                                 RemoveLocalLock(locallock);
727                         return FALSE;
728                 }
729
730                 /*
731                  * Construct bitmask of locks this process holds on this object.
732                  */
733                 {
734                         LOCKMASK        heldLocks = 0;
735
736                         for (i = 1; i <= lockMethodTable->numLockModes; i++)
737                         {
738                                 if (myHolding[i] > 0)
739                                         heldLocks |= LOCKBIT_ON(i);
740                         }
741                         MyProc->heldLocks = heldLocks;
742                 }
743
744                 /*
745                  * Sleep till someone wakes me up.
746                  */
747                 status = WaitOnLock(lockmethodid, locallock, owner);
748
749                 /*
750                  * NOTE: do not do any material change of state between here and
751                  * return.      All required changes in locktable state must have been
752                  * done when the lock was granted to us --- see notes in
753                  * WaitOnLock.
754                  */
755
756                 /*
757                  * Check the proclock entry status, in case something in the ipc
758                  * communication doesn't work correctly.
759                  */
760                 if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
761                 {
762                         PROCLOCK_PRINT("LockAcquire: INCONSISTENT", proclock);
763                         LOCK_PRINT("LockAcquire: INCONSISTENT", lock, lockmode);
764                         /* Should we retry ? */
765                         LWLockRelease(masterLock);
766                         return FALSE;
767                 }
768                 PROCLOCK_PRINT("LockAcquire: granted", proclock);
769                 LOCK_PRINT("LockAcquire: granted", lock, lockmode);
770         }
771
772         LWLockRelease(masterLock);
773
774         return status == STATUS_OK;
775 }
776
777 /*
778  * Subroutine to free a locallock entry
779  */
780 static void
781 RemoveLocalLock(LOCALLOCK *locallock)
782 {
783         LOCKMETHODID lockmethodid = LOCALLOCK_LOCKMETHOD(*locallock);
784
785         pfree(locallock->lockOwners);
786         locallock->lockOwners = NULL;
787         locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash[lockmethodid],
788                                                                                   (void *) &(locallock->tag),
789                                                                                   HASH_REMOVE, NULL);
790         if (!locallock)
791                 elog(WARNING, "locallock table corrupted");
792 }
793
794 /*
795  * LockCheckConflicts -- test whether requested lock conflicts
796  *              with those already granted
797  *
798  * Returns STATUS_FOUND if conflict, STATUS_OK if no conflict.
799  *
800  * NOTES:
801  *              Here's what makes this complicated: one process's locks don't
802  * conflict with one another, even if they are held under different
803  * transaction IDs (eg, session and xact locks do not conflict).
804  * So, we must subtract off our own locks when determining whether the
805  * requested new lock conflicts with those already held.
806  *
807  * The caller can optionally pass the process's total holding counts, if
808  * known.  If NULL is passed then these values will be computed internally.
809  */
810 int
811 LockCheckConflicts(LockMethod lockMethodTable,
812                                    LOCKMODE lockmode,
813                                    LOCK *lock,
814                                    PROCLOCK *proclock,
815                                    PGPROC *proc,
816                                    int *myHolding)              /* myHolding[] array or NULL */
817 {
818         int                     numLockModes = lockMethodTable->numLockModes;
819         LOCKMASK        bitmask;
820         int                     i;
821         int                     localHolding[MAX_LOCKMODES];
822
823         /*
824          * first check for global conflicts: If no locks conflict with my
825          * request, then I get the lock.
826          *
827          * Checking for conflict: lock->grantMask represents the types of
828          * currently held locks.  conflictTable[lockmode] has a bit set for
829          * each type of lock that conflicts with request.       Bitwise compare
830          * tells if there is a conflict.
831          */
832         if (!(lockMethodTable->conflictTab[lockmode] & lock->grantMask))
833         {
834                 PROCLOCK_PRINT("LockCheckConflicts: no conflict", proclock);
835                 return STATUS_OK;
836         }
837
838         /*
839          * Rats.  Something conflicts. But it could still be my own lock.  We
840          * have to construct a conflict mask that does not reflect our own
841          * locks.  Locks held by the current process under another XID also
842          * count as "our own locks".
843          */
844         if (myHolding == NULL)
845         {
846                 /* Caller didn't do calculation of total holding for me */
847                 LockCountMyLocks(proclock->tag.lock, proc, localHolding);
848                 myHolding = localHolding;
849         }
850
851         /* Compute mask of lock types held by other processes */
852         bitmask = 0;
853         for (i = 1; i <= numLockModes; i++)
854         {
855                 if (lock->granted[i] != myHolding[i])
856                         bitmask |= LOCKBIT_ON(i);
857         }
858
859         /*
860          * now check again for conflicts.  'bitmask' describes the types of
861          * locks held by other processes.  If one of these conflicts with the
862          * kind of lock that I want, there is a conflict and I have to sleep.
863          */
864         if (!(lockMethodTable->conflictTab[lockmode] & bitmask))
865         {
866                 /* no conflict. OK to get the lock */
867                 PROCLOCK_PRINT("LockCheckConflicts: resolved", proclock);
868                 return STATUS_OK;
869         }
870
871         PROCLOCK_PRINT("LockCheckConflicts: conflicting", proclock);
872         return STATUS_FOUND;
873 }
874
875 /*
876  * LockCountMyLocks --- Count total number of locks held on a given lockable
877  *              object by a given process (under any transaction ID).
878  *
879  * XXX This could be rather slow if the process holds a large number of locks.
880  * Perhaps it could be sped up if we kept yet a third hashtable of per-
881  * process lock information.  However, for the normal case where a transaction
882  * doesn't hold a large number of locks, keeping such a table would probably
883  * be a net slowdown.
884  */
885 static void
886 LockCountMyLocks(SHMEM_OFFSET lockOffset, PGPROC *proc, int *myHolding)
887 {
888         SHM_QUEUE  *procLocks = &(proc->procLocks);
889         PROCLOCK   *proclock;
890
891         MemSet(myHolding, 0, MAX_LOCKMODES * sizeof(int));
892
893         proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
894                                                                                  offsetof(PROCLOCK, procLink));
895
896         while (proclock)
897         {
898                 if (lockOffset == proclock->tag.lock)
899                 {
900                         LOCKMASK        holdMask = proclock->holdMask;
901                         int                     i;
902
903                         for (i = 1; i < MAX_LOCKMODES; i++)
904                         {
905                                 if (holdMask & LOCKBIT_ON(i))
906                                         myHolding[i]++;
907                         }
908                 }
909
910                 proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->procLink,
911                                                                                    offsetof(PROCLOCK, procLink));
912         }
913 }
914
915 /*
916  * GrantLock -- update the lock and proclock data structures to show
917  *              the lock request has been granted.
918  *
919  * NOTE: if proc was blocked, it also needs to be removed from the wait list
920  * and have its waitLock/waitProcLock fields cleared.  That's not done here.
921  *
922  * NOTE: the lock grant also has to be recorded in the associated LOCALLOCK
923  * table entry; but since we may be awaking some other process, we can't do
924  * that here; it's done by GrantLockLocal, instead.
925  */
926 void
927 GrantLock(LOCK *lock, PROCLOCK *proclock, LOCKMODE lockmode)
928 {
929         lock->nGranted++;
930         lock->granted[lockmode]++;
931         lock->grantMask |= LOCKBIT_ON(lockmode);
932         if (lock->granted[lockmode] == lock->requested[lockmode])
933                 lock->waitMask &= LOCKBIT_OFF(lockmode);
934         proclock->holdMask |= LOCKBIT_ON(lockmode);
935         LOCK_PRINT("GrantLock", lock, lockmode);
936         Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
937         Assert(lock->nGranted <= lock->nRequested);
938 }
939
940 /*
941  * UnGrantLock -- opposite of GrantLock. 
942  *
943  * Updates the lock and proclock data structures to show that the lock
944  * is no longer held nor requested by the current holder.
945  *
946  * Returns true if there were any waiters waiting on the lock that
947  * should now be woken up with ProcLockWakeup.
948  */
949 static bool
950 UnGrantLock(LOCK *lock, LOCKMODE lockmode,
951                         PROCLOCK *proclock, LockMethod lockMethodTable)
952 {
953         bool wakeupNeeded = false;
954
955         Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
956         Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
957         Assert(lock->nGranted <= lock->nRequested);
958
959         /*
960          * fix the general lock stats
961          */
962         lock->nRequested--;
963         lock->requested[lockmode]--;
964         lock->nGranted--;
965         lock->granted[lockmode]--;
966
967         if (lock->granted[lockmode] == 0)
968         {
969                 /* change the conflict mask.  No more of this lock type. */
970                 lock->grantMask &= LOCKBIT_OFF(lockmode);
971         }
972
973         LOCK_PRINT("UnGrantLock: updated", lock, lockmode);
974
975         /*
976          * We need only run ProcLockWakeup if the released lock conflicts with
977          * at least one of the lock types requested by waiter(s).  Otherwise
978          * whatever conflict made them wait must still exist.  NOTE: before
979          * MVCC, we could skip wakeup if lock->granted[lockmode] was still
980          * positive. But that's not true anymore, because the remaining
981          * granted locks might belong to some waiter, who could now be
982          * awakened because he doesn't conflict with his own locks.
983          */
984         if (lockMethodTable->conflictTab[lockmode] & lock->waitMask)
985                 wakeupNeeded = true;
986
987         /*
988          * Now fix the per-proclock state.
989          */
990         proclock->holdMask &= LOCKBIT_OFF(lockmode);
991         PROCLOCK_PRINT("UnGrantLock: updated", proclock);
992
993         return wakeupNeeded;
994 }
995
996 /*
997  * GrantLockLocal -- update the locallock data structures to show
998  *              the lock request has been granted.
999  *
1000  * We expect that LockAcquire made sure there is room to add a new
1001  * ResourceOwner entry.
1002  */
1003 static void
1004 GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner)
1005 {
1006         LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
1007         int                     i;
1008
1009         Assert(locallock->numLockOwners < locallock->maxLockOwners);
1010         /* Count the total */
1011         locallock->nLocks++;
1012         /* Count the per-owner lock */
1013         for (i = 0; i < locallock->numLockOwners; i++)
1014         {
1015                 if (lockOwners[i].owner == owner)
1016                 {
1017                         lockOwners[i].nLocks++;
1018                         return;
1019                 }
1020         }
1021         lockOwners[i].owner = owner;
1022         lockOwners[i].nLocks = 1;
1023         locallock->numLockOwners++;
1024 }
1025
1026 /*
1027  * GrantAwaitedLock -- call GrantLockLocal for the lock we are doing
1028  *              WaitOnLock on.
1029  *
1030  * proc.c needs this for the case where we are booted off the lock by
1031  * timeout, but discover that someone granted us the lock anyway.
1032  *
1033  * We could just export GrantLockLocal, but that would require including
1034  * resowner.h in lock.h, which creates circularity.
1035  */
1036 void
1037 GrantAwaitedLock(void)
1038 {
1039         GrantLockLocal(awaitedLock, awaitedOwner);
1040 }
1041
1042 /*
1043  * WaitOnLock -- wait to acquire a lock
1044  *
1045  * Caller must have set MyProc->heldLocks to reflect locks already held
1046  * on the lockable object by this process (under all XIDs).
1047  *
1048  * The locktable's masterLock must be held at entry.
1049  */
1050 static int
1051 WaitOnLock(LOCKMETHODID lockmethodid, LOCALLOCK *locallock,
1052                    ResourceOwner owner)
1053 {
1054         LockMethod      lockMethodTable = LockMethods[lockmethodid];
1055         char       *new_status,
1056                            *old_status;
1057         size_t          len;
1058
1059         Assert(lockmethodid < NumLockMethods);
1060
1061         LOCK_PRINT("WaitOnLock: sleeping on lock",
1062                            locallock->lock, locallock->tag.mode);
1063
1064         old_status = pstrdup(get_ps_display());
1065         len = strlen(old_status);
1066         new_status = (char *) palloc(len + 8 + 1);
1067         memcpy(new_status, old_status, len);
1068         strcpy(new_status + len, " waiting");
1069         set_ps_display(new_status);
1070
1071         awaitedLock = locallock;
1072         awaitedOwner = owner;
1073
1074         /*
1075          * NOTE: Think not to put any shared-state cleanup after the call to
1076          * ProcSleep, in either the normal or failure path.  The lock state
1077          * must be fully set by the lock grantor, or by CheckDeadLock if we
1078          * give up waiting for the lock.  This is necessary because of the
1079          * possibility that a cancel/die interrupt will interrupt ProcSleep
1080          * after someone else grants us the lock, but before we've noticed it.
1081          * Hence, after granting, the locktable state must fully reflect the
1082          * fact that we own the lock; we can't do additional work on return.
1083          * Contrariwise, if we fail, any cleanup must happen in xact abort
1084          * processing, not here, to ensure it will also happen in the
1085          * cancel/die case.
1086          */
1087
1088         if (ProcSleep(lockMethodTable,
1089                                   locallock->tag.mode,
1090                                   locallock->lock,
1091                                   locallock->proclock) != STATUS_OK)
1092         {
1093                 /*
1094                  * We failed as a result of a deadlock, see CheckDeadLock(). Quit
1095                  * now.
1096                  */
1097                 awaitedLock = NULL;
1098                 LOCK_PRINT("WaitOnLock: aborting on lock",
1099                                    locallock->lock, locallock->tag.mode);
1100                 LWLockRelease(lockMethodTable->masterLock);
1101
1102                 /*
1103                  * Now that we aren't holding the LockMgrLock, we can give an
1104                  * error report including details about the detected deadlock.
1105                  */
1106                 DeadLockReport();
1107                 /* not reached */
1108         }
1109
1110         awaitedLock = NULL;
1111
1112         set_ps_display(old_status);
1113         pfree(old_status);
1114         pfree(new_status);
1115
1116         LOCK_PRINT("WaitOnLock: wakeup on lock",
1117                            locallock->lock, locallock->tag.mode);
1118         return STATUS_OK;
1119 }
1120
1121 /*
1122  * Remove a proc from the wait-queue it is on
1123  * (caller must know it is on one).
1124  *
1125  * Locktable lock must be held by caller.
1126  *
1127  * NB: this does not clean up any locallock object that may exist for the lock.
1128  */
1129 void
1130 RemoveFromWaitQueue(PGPROC *proc)
1131 {
1132         LOCK       *waitLock = proc->waitLock;
1133         PROCLOCK   *proclock = proc->waitProcLock;
1134         LOCKMODE        lockmode = proc->waitLockMode;
1135         LOCKMETHODID lockmethodid = LOCK_LOCKMETHOD(*waitLock);
1136
1137         /* Make sure proc is waiting */
1138         Assert(proc->links.next != INVALID_OFFSET);
1139         Assert(waitLock);
1140         Assert(waitLock->waitProcs.size > 0);
1141         Assert(0 < lockmethodid && lockmethodid < NumLockMethods);
1142
1143         /* Remove proc from lock's wait queue */
1144         SHMQueueDelete(&(proc->links));
1145         waitLock->waitProcs.size--;
1146
1147         /* Undo increments of request counts by waiting process */
1148         Assert(waitLock->nRequested > 0);
1149         Assert(waitLock->nRequested > proc->waitLock->nGranted);
1150         waitLock->nRequested--;
1151         Assert(waitLock->requested[lockmode] > 0);
1152         waitLock->requested[lockmode]--;
1153         /* don't forget to clear waitMask bit if appropriate */
1154         if (waitLock->granted[lockmode] == waitLock->requested[lockmode])
1155                 waitLock->waitMask &= LOCKBIT_OFF(lockmode);
1156
1157         /* Clean up the proc's own state */
1158         proc->waitLock = NULL;
1159         proc->waitProcLock = NULL;
1160
1161         /*
1162          * Delete the proclock immediately if it represents no already-held locks.
1163          * This must happen now because if the owner of the lock decides to release
1164          * it, and the requested/granted counts then go to zero, LockRelease
1165          * expects there to be no remaining proclocks.
1166          */
1167         if (proclock->holdMask == 0)
1168         {
1169                 PROCLOCK_PRINT("RemoveFromWaitQueue: deleting proclock", proclock);
1170                 SHMQueueDelete(&proclock->lockLink);
1171                 SHMQueueDelete(&proclock->procLink);
1172                 proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid],
1173                                                                                         (void *) &(proclock->tag),
1174                                                                                         HASH_REMOVE, NULL);
1175                 if (!proclock)
1176                         elog(WARNING, "proclock table corrupted");
1177         }
1178
1179         /*
1180          * There should still be some requests for the lock ... else what were
1181          * we waiting for?  Therefore no need to delete the lock object.
1182          */
1183         Assert(waitLock->nRequested > 0);
1184
1185         /* See if any other waiters for the lock can be woken up now */
1186         ProcLockWakeup(LockMethods[lockmethodid], waitLock);
1187 }
1188
1189 /*
1190  * LockRelease -- look up 'locktag' in lock table 'lockmethodid' and
1191  *              release one 'lockmode' lock on it.
1192  *
1193  * Side Effects: find any waiting processes that are now wakable,
1194  *              grant them their requested locks and awaken them.
1195  *              (We have to grant the lock here to avoid a race between
1196  *              the waking process and any new process to
1197  *              come along and request the lock.)
1198  */
1199 bool
1200 LockRelease(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
1201                         TransactionId xid, LOCKMODE lockmode)
1202 {
1203         LOCALLOCKTAG localtag;
1204         LOCALLOCK  *locallock;
1205         LOCK       *lock;
1206         PROCLOCK   *proclock;
1207         LWLockId        masterLock;
1208         LockMethod      lockMethodTable;
1209         bool            wakeupNeeded;
1210
1211 #ifdef LOCK_DEBUG
1212         if (Trace_userlocks && lockmethodid == USER_LOCKMETHOD)
1213                 elog(LOG, "LockRelease: user lock [%u,%u] %s",
1214                          locktag->locktag_field1, locktag->locktag_field2,
1215                          lock_mode_names[lockmode]);
1216 #endif
1217
1218         /* ugly */
1219         locktag->locktag_lockmethodid = lockmethodid;
1220
1221         Assert(lockmethodid < NumLockMethods);
1222         lockMethodTable = LockMethods[lockmethodid];
1223         if (!lockMethodTable)
1224         {
1225                 elog(WARNING, "lockMethodTable is null in LockRelease");
1226                 return FALSE;
1227         }
1228
1229         /*
1230          * Find the LOCALLOCK entry for this lock and lockmode
1231          */
1232         MemSet(&localtag, 0, sizeof(localtag));         /* must clear padding */
1233         localtag.lock = *locktag;
1234         localtag.xid = xid;
1235         localtag.mode = lockmode;
1236
1237         locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash[lockmethodid],
1238                                                                                   (void *) &localtag,
1239                                                                                   HASH_FIND, NULL);
1240
1241         /*
1242          * let the caller print its own error message, too. Do not
1243          * ereport(ERROR).
1244          */
1245         if (!locallock || locallock->nLocks <= 0)
1246         {
1247                 elog(WARNING, "you don't own a lock of type %s",
1248                          lock_mode_names[lockmode]);
1249                 return FALSE;
1250         }
1251
1252         /*
1253          * Decrease the count for the resource owner.
1254          */
1255         {
1256                 LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
1257                 ResourceOwner owner;
1258                 int                     i;
1259
1260                 /* Session locks and user locks are not transactional */
1261                 if (xid != InvalidTransactionId &&
1262                         lockmethodid == DEFAULT_LOCKMETHOD)
1263                         owner = CurrentResourceOwner;
1264                 else
1265                         owner = NULL;
1266
1267                 for (i = locallock->numLockOwners - 1; i >= 0; i--)
1268                 {
1269                         if (lockOwners[i].owner == owner)
1270                         {
1271                                 Assert(lockOwners[i].nLocks > 0);
1272                                 if (--lockOwners[i].nLocks == 0)
1273                                 {
1274                                         /* compact out unused slot */
1275                                         locallock->numLockOwners--;
1276                                         if (i < locallock->numLockOwners)
1277                                                 lockOwners[i] = lockOwners[locallock->numLockOwners];
1278                                 }
1279                                 break;
1280                         }
1281                 }
1282                 if (i < 0)
1283                 {
1284                         /* don't release a lock belonging to another owner */
1285                         elog(WARNING, "you don't own a lock of type %s",
1286                                  lock_mode_names[lockmode]);
1287                         return FALSE;
1288                 }
1289         }
1290
1291         /*
1292          * Decrease the total local count.      If we're still holding the lock,
1293          * we're done.
1294          */
1295         locallock->nLocks--;
1296
1297         if (locallock->nLocks > 0)
1298                 return TRUE;
1299
1300         /*
1301          * Otherwise we've got to mess with the shared lock table.
1302          */
1303         masterLock = lockMethodTable->masterLock;
1304
1305         LWLockAcquire(masterLock, LW_EXCLUSIVE);
1306
1307         /*
1308          * We don't need to re-find the lock or proclock, since we kept their
1309          * addresses in the locallock table, and they couldn't have been
1310          * removed while we were holding a lock on them.
1311          */
1312         lock = locallock->lock;
1313         LOCK_PRINT("LockRelease: found", lock, lockmode);
1314         proclock = locallock->proclock;
1315         PROCLOCK_PRINT("LockRelease: found", proclock);
1316
1317         /*
1318          * Double-check that we are actually holding a lock of the type we
1319          * want to release.
1320          */
1321         if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
1322         {
1323                 PROCLOCK_PRINT("LockRelease: WRONGTYPE", proclock);
1324                 LWLockRelease(masterLock);
1325                 elog(WARNING, "you don't own a lock of type %s",
1326                          lock_mode_names[lockmode]);
1327                 RemoveLocalLock(locallock);
1328                 return FALSE;
1329         }
1330
1331         wakeupNeeded = UnGrantLock(lock, lockmode, proclock, lockMethodTable);
1332
1333         /*
1334          * If this was my last hold on this lock, delete my entry in the
1335          * proclock table.
1336          */
1337         if (proclock->holdMask == 0)
1338         {
1339                 PROCLOCK_PRINT("LockRelease: deleting proclock", proclock);
1340                 SHMQueueDelete(&proclock->lockLink);
1341                 SHMQueueDelete(&proclock->procLink);
1342                 proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid],
1343                                                                                         (void *) &(proclock->tag),
1344                                                                                         HASH_REMOVE, NULL);
1345                 if (!proclock)
1346                 {
1347                         LWLockRelease(masterLock);
1348                         elog(WARNING, "proclock table corrupted");
1349                         RemoveLocalLock(locallock);
1350                         return FALSE;
1351                 }
1352         }
1353
1354         if (lock->nRequested == 0)
1355         {
1356                 /*
1357                  * We've just released the last lock, so garbage-collect the lock
1358                  * object.
1359                  */
1360                 LOCK_PRINT("LockRelease: deleting lock", lock, lockmode);
1361                 Assert(SHMQueueEmpty(&(lock->procLocks)));
1362                 lock = (LOCK *) hash_search(LockMethodLockHash[lockmethodid],
1363                                                                         (void *) &(lock->tag),
1364                                                                         HASH_REMOVE, NULL);
1365                 if (!lock)
1366                 {
1367                         LWLockRelease(masterLock);
1368                         elog(WARNING, "lock table corrupted");
1369                         RemoveLocalLock(locallock);
1370                         return FALSE;
1371                 }
1372         }
1373         else
1374         {
1375                 /*
1376                  * Wake up waiters if needed.
1377                  */
1378                 if (wakeupNeeded)
1379                         ProcLockWakeup(lockMethodTable, lock);
1380         }
1381
1382         LWLockRelease(masterLock);
1383
1384         RemoveLocalLock(locallock);
1385         return TRUE;
1386 }
1387
1388 /*
1389  * LockReleaseAll -- Release all locks of the specified lock method that
1390  *              are held by the current process.
1391  *
1392  * Well, not necessarily *all* locks.  The available behaviors are:
1393  *
1394  * allxids == true: release all locks regardless of transaction
1395  * affiliation.
1396  *
1397  * allxids == false: release all locks with Xid != 0
1398  * (zero is the Xid used for "session" locks).
1399  */
1400 bool
1401 LockReleaseAll(LOCKMETHODID lockmethodid, bool allxids)
1402 {
1403         HASH_SEQ_STATUS status;
1404         SHM_QUEUE  *procLocks = &(MyProc->procLocks);
1405         LWLockId        masterLock;
1406         LockMethod      lockMethodTable;
1407         int                     i,
1408                                 numLockModes;
1409         LOCALLOCK  *locallock;
1410         PROCLOCK   *proclock;
1411         LOCK       *lock;
1412
1413 #ifdef LOCK_DEBUG
1414         if (lockmethodid == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
1415                 elog(LOG, "LockReleaseAll: lockmethod=%d", lockmethodid);
1416 #endif
1417
1418         Assert(lockmethodid < NumLockMethods);
1419         lockMethodTable = LockMethods[lockmethodid];
1420         if (!lockMethodTable)
1421         {
1422                 elog(WARNING, "bad lock method: %d", lockmethodid);
1423                 return FALSE;
1424         }
1425
1426         numLockModes = lockMethodTable->numLockModes;
1427         masterLock = lockMethodTable->masterLock;
1428
1429         /*
1430          * First we run through the locallock table and get rid of unwanted
1431          * entries, then we scan the process's proclocks and get rid of those.
1432          * We do this separately because we may have multiple locallock
1433          * entries pointing to the same proclock, and we daren't end up with
1434          * any dangling pointers.
1435          */
1436         hash_seq_init(&status, LockMethodLocalHash[lockmethodid]);
1437
1438         while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
1439         {
1440                 if (locallock->proclock == NULL || locallock->lock == NULL)
1441                 {
1442                         /*
1443                          * We must've run out of shared memory while trying to set up
1444                          * this lock.  Just forget the local entry.
1445                          */
1446                         Assert(locallock->nLocks == 0);
1447                         RemoveLocalLock(locallock);
1448                         continue;
1449                 }
1450
1451                 /* Ignore items that are not of the lockmethod to be removed */
1452                 if (LOCALLOCK_LOCKMETHOD(*locallock) != lockmethodid)
1453                         continue;
1454
1455                 /*
1456                  * Ignore locks with Xid=0 unless we are asked to release all
1457                  * locks
1458                  */
1459                 if (TransactionIdEquals(locallock->tag.xid, InvalidTransactionId)
1460                         && !allxids)
1461                         continue;
1462
1463                 RemoveLocalLock(locallock);
1464         }
1465
1466         LWLockAcquire(masterLock, LW_EXCLUSIVE);
1467
1468         proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
1469                                                                                  offsetof(PROCLOCK, procLink));
1470
1471         while (proclock)
1472         {
1473                 bool            wakeupNeeded = false;
1474                 PROCLOCK   *nextHolder;
1475
1476                 /* Get link first, since we may unlink/delete this proclock */
1477                 nextHolder = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->procLink,
1478                                                                                    offsetof(PROCLOCK, procLink));
1479
1480                 Assert(proclock->tag.proc == MAKE_OFFSET(MyProc));
1481
1482                 lock = (LOCK *) MAKE_PTR(proclock->tag.lock);
1483
1484                 /* Ignore items that are not of the lockmethod to be removed */
1485                 if (LOCK_LOCKMETHOD(*lock) != lockmethodid)
1486                         goto next_item;
1487
1488                 /*
1489                  * Ignore locks with Xid=0 unless we are asked to release all
1490                  * locks
1491                  */
1492                 if (TransactionIdEquals(proclock->tag.xid, InvalidTransactionId)
1493                         && !allxids)
1494                         goto next_item;
1495
1496                 PROCLOCK_PRINT("LockReleaseAll", proclock);
1497                 LOCK_PRINT("LockReleaseAll", lock, 0);
1498                 Assert(lock->nRequested >= 0);
1499                 Assert(lock->nGranted >= 0);
1500                 Assert(lock->nGranted <= lock->nRequested);
1501                 Assert((proclock->holdMask & ~lock->grantMask) == 0);
1502
1503                 /*
1504                  * fix the general lock stats
1505                  */
1506                 if (proclock->holdMask)
1507                 {
1508                         for (i = 1; i <= numLockModes; i++)
1509                         {
1510                                 if (proclock->holdMask & LOCKBIT_ON(i))
1511                                         wakeupNeeded |= UnGrantLock(lock, i, proclock,
1512                                                                                                 lockMethodTable);
1513                         }
1514                 }
1515                 Assert((lock->nRequested >= 0) && (lock->nGranted >= 0));
1516                 Assert(lock->nGranted <= lock->nRequested);
1517                 LOCK_PRINT("LockReleaseAll: updated", lock, 0);
1518
1519                 PROCLOCK_PRINT("LockReleaseAll: deleting", proclock);
1520
1521                 /*
1522                  * Remove the proclock entry from the linked lists
1523                  */
1524                 SHMQueueDelete(&proclock->lockLink);
1525                 SHMQueueDelete(&proclock->procLink);
1526
1527                 /*
1528                  * remove the proclock entry from the hashtable
1529                  */
1530                 proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid],
1531                                                                                         (void *) &(proclock->tag),
1532                                                                                         HASH_REMOVE,
1533                                                                                         NULL);
1534                 if (!proclock)
1535                 {
1536                         LWLockRelease(masterLock);
1537                         elog(WARNING, "proclock table corrupted");
1538                         return FALSE;
1539                 }
1540
1541                 if (lock->nRequested == 0)
1542                 {
1543                         /*
1544                          * We've just released the last lock, so garbage-collect the
1545                          * lock object.
1546                          */
1547                         LOCK_PRINT("LockReleaseAll: deleting", lock, 0);
1548                         Assert(SHMQueueEmpty(&(lock->procLocks)));
1549                         lock = (LOCK *) hash_search(LockMethodLockHash[lockmethodid],
1550                                                                                 (void *) &(lock->tag),
1551                                                                                 HASH_REMOVE, NULL);
1552                         if (!lock)
1553                         {
1554                                 LWLockRelease(masterLock);
1555                                 elog(WARNING, "lock table corrupted");
1556                                 return FALSE;
1557                         }
1558                 }
1559                 else if (wakeupNeeded)
1560                         ProcLockWakeup(lockMethodTable, lock);
1561
1562 next_item:
1563                 proclock = nextHolder;
1564         }
1565
1566         LWLockRelease(masterLock);
1567
1568 #ifdef LOCK_DEBUG
1569         if (lockmethodid == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
1570                 elog(LOG, "LockReleaseAll done");
1571 #endif
1572
1573         return TRUE;
1574 }
1575
1576 /*
1577  * LockReleaseCurrentOwner
1578  *              Release all locks belonging to CurrentResourceOwner
1579  *
1580  * Only DEFAULT_LOCKMETHOD locks can belong to a resource owner.
1581  */
1582 void
1583 LockReleaseCurrentOwner(void)
1584 {
1585         HASH_SEQ_STATUS status;
1586         LOCALLOCK  *locallock;
1587         LOCALLOCKOWNER *lockOwners;
1588         int                     i;
1589
1590         hash_seq_init(&status, LockMethodLocalHash[DEFAULT_LOCKMETHOD]);
1591
1592         while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
1593         {
1594                 /* Ignore items that must be nontransactional */
1595                 if (LOCALLOCK_LOCKMETHOD(*locallock) != DEFAULT_LOCKMETHOD)
1596                         continue;
1597                 if (TransactionIdEquals(locallock->tag.xid, InvalidTransactionId))
1598                         continue;
1599
1600                 /* Scan to see if there are any locks belonging to current owner */
1601                 lockOwners = locallock->lockOwners;
1602                 for (i = locallock->numLockOwners - 1; i >= 0; i--)
1603                 {
1604                         if (lockOwners[i].owner == CurrentResourceOwner)
1605                         {
1606                                 Assert(lockOwners[i].nLocks > 0);
1607                                 if (lockOwners[i].nLocks < locallock->nLocks)
1608                                 {
1609                                         /*
1610                                          * We will still hold this lock after forgetting this
1611                                          * ResourceOwner.
1612                                          */
1613                                         locallock->nLocks -= lockOwners[i].nLocks;
1614                                         /* compact out unused slot */
1615                                         locallock->numLockOwners--;
1616                                         if (i < locallock->numLockOwners)
1617                                                 lockOwners[i] = lockOwners[locallock->numLockOwners];
1618                                 }
1619                                 else
1620                                 {
1621                                         Assert(lockOwners[i].nLocks == locallock->nLocks);
1622                                         /* We want to call LockRelease just once */
1623                                         lockOwners[i].nLocks = 1;
1624                                         locallock->nLocks = 1;
1625                                         if (!LockRelease(DEFAULT_LOCKMETHOD,
1626                                                                          &locallock->tag.lock,
1627                                                                          locallock->tag.xid,
1628                                                                          locallock->tag.mode))
1629                                                 elog(WARNING, "LockReleaseCurrentOwner: failed??");
1630                                 }
1631                                 break;
1632                         }
1633                 }
1634         }
1635 }
1636
1637 /*
1638  * LockReassignCurrentOwner
1639  *              Reassign all locks belonging to CurrentResourceOwner to belong
1640  *              to its parent resource owner
1641  */
1642 void
1643 LockReassignCurrentOwner(void)
1644 {
1645         ResourceOwner parent = ResourceOwnerGetParent(CurrentResourceOwner);
1646         HASH_SEQ_STATUS status;
1647         LOCALLOCK  *locallock;
1648         LOCALLOCKOWNER *lockOwners;
1649
1650         Assert(parent != NULL);
1651
1652         hash_seq_init(&status, LockMethodLocalHash[DEFAULT_LOCKMETHOD]);
1653
1654         while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
1655         {
1656                 int                     i;
1657                 int                     ic = -1;
1658                 int                     ip = -1;
1659
1660                 /* Ignore items that must be nontransactional */
1661                 if (LOCALLOCK_LOCKMETHOD(*locallock) != DEFAULT_LOCKMETHOD)
1662                         continue;
1663                 if (TransactionIdEquals(locallock->tag.xid, InvalidTransactionId))
1664                         continue;
1665
1666                 /*
1667                  * Scan to see if there are any locks belonging to current owner
1668                  * or its parent
1669                  */
1670                 lockOwners = locallock->lockOwners;
1671                 for (i = locallock->numLockOwners - 1; i >= 0; i--)
1672                 {
1673                         if (lockOwners[i].owner == CurrentResourceOwner)
1674                                 ic = i;
1675                         else if (lockOwners[i].owner == parent)
1676                                 ip = i;
1677                 }
1678
1679                 if (ic < 0)
1680                         continue;                       /* no current locks */
1681
1682                 if (ip < 0)
1683                 {
1684                         /* Parent has no slot, so just give it child's slot */
1685                         lockOwners[ic].owner = parent;
1686                 }
1687                 else
1688                 {
1689                         /* Merge child's count with parent's */
1690                         lockOwners[ip].nLocks += lockOwners[ic].nLocks;
1691                         /* compact out unused slot */
1692                         locallock->numLockOwners--;
1693                         if (ic < locallock->numLockOwners)
1694                                 lockOwners[ic] = lockOwners[locallock->numLockOwners];
1695                 }
1696         }
1697 }
1698
1699
1700 /*
1701  * Estimate shared-memory space used for lock tables
1702  */
1703 int
1704 LockShmemSize(int maxBackends)
1705 {
1706         int                     size = 0;
1707         long            max_table_size = NLOCKENTS(maxBackends);
1708
1709         /* lock method headers */
1710         size += MAX_LOCK_METHODS * MAXALIGN(sizeof(LockMethodData));
1711
1712         /* lockHash table */
1713         size += hash_estimate_size(max_table_size, sizeof(LOCK));
1714
1715         /* proclockHash table */
1716         size += hash_estimate_size(max_table_size, sizeof(PROCLOCK));
1717
1718         /*
1719          * Note we count only one pair of hash tables, since the userlocks
1720          * table actually overlays the main one.
1721          *
1722          * Since the lockHash entry count above is only an estimate, add 10%
1723          * safety margin.
1724          */
1725         size += size / 10;
1726
1727         return size;
1728 }
1729
1730 /*
1731  * GetLockStatusData - Return a summary of the lock manager's internal
1732  * status, for use in a user-level reporting function.
1733  *
1734  * The return data consists of an array of PROCLOCK objects, with the
1735  * associated PGPROC and LOCK objects for each.  Note that multiple
1736  * copies of the same PGPROC and/or LOCK objects are likely to appear.
1737  * It is the caller's responsibility to match up duplicates if wanted.
1738  *
1739  * The design goal is to hold the LockMgrLock for as short a time as possible;
1740  * thus, this function simply makes a copy of the necessary data and releases
1741  * the lock, allowing the caller to contemplate and format the data for as
1742  * long as it pleases.
1743  */
1744 LockData *
1745 GetLockStatusData(void)
1746 {
1747         LockData   *data;
1748         HTAB       *proclockTable;
1749         PROCLOCK   *proclock;
1750         HASH_SEQ_STATUS seqstat;
1751         int                     i;
1752
1753         data = (LockData *) palloc(sizeof(LockData));
1754
1755         LWLockAcquire(LockMgrLock, LW_EXCLUSIVE);
1756
1757         proclockTable = LockMethodProcLockHash[DEFAULT_LOCKMETHOD];
1758
1759         data->nelements = i = proclockTable->hctl->nentries;
1760
1761         data->proclockaddrs = (SHMEM_OFFSET *) palloc(sizeof(SHMEM_OFFSET) * i);
1762         data->proclocks = (PROCLOCK *) palloc(sizeof(PROCLOCK) * i);
1763         data->procs = (PGPROC *) palloc(sizeof(PGPROC) * i);
1764         data->locks = (LOCK *) palloc(sizeof(LOCK) * i);
1765
1766         hash_seq_init(&seqstat, proclockTable);
1767
1768         i = 0;
1769         while ((proclock = hash_seq_search(&seqstat)))
1770         {
1771                 PGPROC     *proc = (PGPROC *) MAKE_PTR(proclock->tag.proc);
1772                 LOCK       *lock = (LOCK *) MAKE_PTR(proclock->tag.lock);
1773
1774                 data->proclockaddrs[i] = MAKE_OFFSET(proclock);
1775                 memcpy(&(data->proclocks[i]), proclock, sizeof(PROCLOCK));
1776                 memcpy(&(data->procs[i]), proc, sizeof(PGPROC));
1777                 memcpy(&(data->locks[i]), lock, sizeof(LOCK));
1778
1779                 i++;
1780         }
1781
1782         LWLockRelease(LockMgrLock);
1783
1784         Assert(i == data->nelements);
1785
1786         return data;
1787 }
1788
1789 /* Provide the textual name of any lock mode */
1790 const char *
1791 GetLockmodeName(LOCKMODE mode)
1792 {
1793         Assert(mode <= MAX_LOCKMODES);
1794         return lock_mode_names[mode];
1795 }
1796
1797 #ifdef LOCK_DEBUG
1798 /*
1799  * Dump all locks in the MyProc->procLocks list.
1800  *
1801  * Must have already acquired the masterLock.
1802  */
1803 void
1804 DumpLocks(void)
1805 {
1806         PGPROC     *proc;
1807         SHM_QUEUE  *procLocks;
1808         PROCLOCK   *proclock;
1809         LOCK       *lock;
1810         int                     lockmethodid = DEFAULT_LOCKMETHOD;
1811         LockMethod      lockMethodTable;
1812
1813         proc = MyProc;
1814         if (proc == NULL)
1815                 return;
1816
1817         procLocks = &proc->procLocks;
1818
1819         Assert(lockmethodid < NumLockMethods);
1820         lockMethodTable = LockMethods[lockmethodid];
1821         if (!lockMethodTable)
1822                 return;
1823
1824         if (proc->waitLock)
1825                 LOCK_PRINT("DumpLocks: waiting on", proc->waitLock, 0);
1826
1827         proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
1828                                                                                  offsetof(PROCLOCK, procLink));
1829
1830         while (proclock)
1831         {
1832                 Assert(proclock->tag.proc == MAKE_OFFSET(proc));
1833
1834                 lock = (LOCK *) MAKE_PTR(proclock->tag.lock);
1835
1836                 PROCLOCK_PRINT("DumpLocks", proclock);
1837                 LOCK_PRINT("DumpLocks", lock, 0);
1838
1839                 proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->procLink,
1840                                                                                    offsetof(PROCLOCK, procLink));
1841         }
1842 }
1843
1844 /*
1845  * Dump all postgres locks. Must have already acquired the masterLock.
1846  */
1847 void
1848 DumpAllLocks(void)
1849 {
1850         PGPROC     *proc;
1851         PROCLOCK   *proclock;
1852         LOCK       *lock;
1853         int                     lockmethodid = DEFAULT_LOCKMETHOD;
1854         LockMethod      lockMethodTable;
1855         HTAB       *proclockTable;
1856         HASH_SEQ_STATUS status;
1857
1858         proc = MyProc;
1859         if (proc == NULL)
1860                 return;
1861
1862         Assert(lockmethodid < NumLockMethods);
1863         lockMethodTable = LockMethods[lockmethodid];
1864         if (!lockMethodTable)
1865                 return;
1866
1867         proclockTable = LockMethodProcLockHash[lockmethodid];
1868
1869         if (proc->waitLock)
1870                 LOCK_PRINT("DumpAllLocks: waiting on", proc->waitLock, 0);
1871
1872         hash_seq_init(&status, proclockTable);
1873         while ((proclock = (PROCLOCK *) hash_seq_search(&status)) != NULL)
1874         {
1875                 PROCLOCK_PRINT("DumpAllLocks", proclock);
1876
1877                 if (proclock->tag.lock)
1878                 {
1879                         lock = (LOCK *) MAKE_PTR(proclock->tag.lock);
1880                         LOCK_PRINT("DumpAllLocks", lock, 0);
1881                 }
1882                 else
1883                         elog(LOG, "DumpAllLocks: proclock->tag.lock = NULL");
1884         }
1885 }
1886
1887 #endif   /* LOCK_DEBUG */