]> granicus.if.org Git - postgresql/blob - src/backend/storage/lmgr/lock.c
Tag appropriate files for rc3
[postgresql] / src / backend / storage / lmgr / lock.c
1 /*-------------------------------------------------------------------------
2  *
3  * lock.c
4  *        POSTGRES low-level lock mechanism
5  *
6  * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $PostgreSQL: pgsql/src/backend/storage/lmgr/lock.c,v 1.145 2004/12/31 22:01:05 pgsql Exp $
12  *
13  * NOTES
14  *        Outside modules can create a lock table and acquire/release
15  *        locks.  A lock table is a shared memory hash table.  When
16  *        a process tries to acquire a lock of a type that conflicts
17  *        with existing locks, it is put to sleep using the routines
18  *        in storage/lmgr/proc.c.
19  *
20  *        For the most part, this code should be invoked via lmgr.c
21  *        or another lock-management module, not directly.
22  *
23  *      Interface:
24  *
25  *      LockAcquire(), LockRelease(), LockMethodTableInit(),
26  *      LockMethodTableRename(), LockReleaseAll(),
27  *      LockCheckConflicts(), GrantLock()
28  *
29  *-------------------------------------------------------------------------
30  */
31 #include "postgres.h"
32
33 #include <signal.h>
34 #include <unistd.h>
35
36 #include "access/xact.h"
37 #include "miscadmin.h"
38 #include "storage/proc.h"
39 #include "utils/memutils.h"
40 #include "utils/ps_status.h"
41 #include "utils/resowner.h"
42
43
44 /* This configuration variable is used to set the lock table size */
45 int                     max_locks_per_xact; /* set by guc.c */
46
47 #define NLOCKENTS(maxBackends)  (max_locks_per_xact * (maxBackends))
48
49
50 /*
51  * map from lock method id to the lock table data structures
52  */
53 static LockMethod LockMethods[MAX_LOCK_METHODS];
54 static HTAB *LockMethodLockHash[MAX_LOCK_METHODS];
55 static HTAB *LockMethodProcLockHash[MAX_LOCK_METHODS];
56 static HTAB *LockMethodLocalHash[MAX_LOCK_METHODS];
57
58 /* exported so lmgr.c can initialize it */
59 int                     NumLockMethods;
60
61
62 /* private state for GrantAwaitedLock */
63 static LOCALLOCK *awaitedLock;
64 static ResourceOwner awaitedOwner;
65
66
67 static const char *const lock_mode_names[] =
68 {
69         "INVALID",
70         "AccessShareLock",
71         "RowShareLock",
72         "RowExclusiveLock",
73         "ShareUpdateExclusiveLock",
74         "ShareLock",
75         "ShareRowExclusiveLock",
76         "ExclusiveLock",
77         "AccessExclusiveLock"
78 };
79
80
81 #ifdef LOCK_DEBUG
82
83 /*------
84  * The following configuration options are available for lock debugging:
85  *
86  *         TRACE_LOCKS          -- give a bunch of output what's going on in this file
87  *         TRACE_USERLOCKS      -- same but for user locks
88  *         TRACE_LOCK_OIDMIN-- do not trace locks for tables below this oid
89  *                                                 (use to avoid output on system tables)
90  *         TRACE_LOCK_TABLE -- trace locks on this table (oid) unconditionally
91  *         DEBUG_DEADLOCKS      -- currently dumps locks at untimely occasions ;)
92  *
93  * Furthermore, but in storage/lmgr/lwlock.c:
94  *         TRACE_LWLOCKS        -- trace lightweight locks (pretty useless)
95  *
96  * Define LOCK_DEBUG at compile time to get all these enabled.
97  * --------
98  */
99
100 int                     Trace_lock_oidmin = BootstrapObjectIdData;
101 bool            Trace_locks = false;
102 bool            Trace_userlocks = false;
103 int                     Trace_lock_table = 0;
104 bool            Debug_deadlocks = false;
105
106
107 inline static bool
108 LOCK_DEBUG_ENABLED(const LOCK *lock)
109 {
110         return
111                 (((LOCK_LOCKMETHOD(*lock) == DEFAULT_LOCKMETHOD && Trace_locks)
112            || (LOCK_LOCKMETHOD(*lock) == USER_LOCKMETHOD && Trace_userlocks))
113                  && (lock->tag.relId >= (Oid) Trace_lock_oidmin))
114                 || (Trace_lock_table && (lock->tag.relId == Trace_lock_table));
115 }
116
117
118 inline static void
119 LOCK_PRINT(const char *where, const LOCK *lock, LOCKMODE type)
120 {
121         if (LOCK_DEBUG_ENABLED(lock))
122                 elog(LOG,
123                          "%s: lock(%lx) tbl(%d) rel(%u) db(%u) obj(%u) grantMask(%x) "
124                          "req(%d,%d,%d,%d,%d,%d,%d)=%d "
125                          "grant(%d,%d,%d,%d,%d,%d,%d)=%d wait(%d) type(%s)",
126                          where, MAKE_OFFSET(lock),
127                          lock->tag.lockmethodid, lock->tag.relId, lock->tag.dbId,
128                          lock->tag.objId.blkno, lock->grantMask,
129                          lock->requested[1], lock->requested[2], lock->requested[3],
130                          lock->requested[4], lock->requested[5], lock->requested[6],
131                          lock->requested[7], lock->nRequested,
132                          lock->granted[1], lock->granted[2], lock->granted[3],
133                          lock->granted[4], lock->granted[5], lock->granted[6],
134                          lock->granted[7], lock->nGranted,
135                          lock->waitProcs.size, lock_mode_names[type]);
136 }
137
138
139 inline static void
140 PROCLOCK_PRINT(const char *where, const PROCLOCK *proclockP)
141 {
142         if (
143                 (((PROCLOCK_LOCKMETHOD(*proclockP) == DEFAULT_LOCKMETHOD && Trace_locks)
144                   || (PROCLOCK_LOCKMETHOD(*proclockP) == USER_LOCKMETHOD && Trace_userlocks))
145                  && (((LOCK *) MAKE_PTR(proclockP->tag.lock))->tag.relId >= (Oid) Trace_lock_oidmin))
146                 || (Trace_lock_table && (((LOCK *) MAKE_PTR(proclockP->tag.lock))->tag.relId == Trace_lock_table))
147                 )
148                 elog(LOG,
149                 "%s: proclock(%lx) lock(%lx) tbl(%d) proc(%lx) xid(%u) hold(%x)",
150                          where, MAKE_OFFSET(proclockP), proclockP->tag.lock,
151                          PROCLOCK_LOCKMETHOD(*(proclockP)),
152                          proclockP->tag.proc, proclockP->tag.xid,
153                          (int) proclockP->holdMask);
154 }
155
156 #else                                                   /* not LOCK_DEBUG */
157
158 #define LOCK_PRINT(where, lock, type)
159 #define PROCLOCK_PRINT(where, proclockP)
160 #endif   /* not LOCK_DEBUG */
161
162
163 static void RemoveLocalLock(LOCALLOCK *locallock);
164 static void GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner);
165 static int WaitOnLock(LOCKMETHODID lockmethodid, LOCALLOCK *locallock,
166                    ResourceOwner owner);
167 static void LockCountMyLocks(SHMEM_OFFSET lockOffset, PGPROC *proc,
168                                  int *myHolding);
169
170
171 /*
172  * InitLocks -- Init the lock module.  Create a private data
173  *              structure for constructing conflict masks.
174  */
175 void
176 InitLocks(void)
177 {
178         /* NOP */
179 }
180
181
182 /*
183  * Fetch the lock method table associated with a given lock
184  */
185 LockMethod
186 GetLocksMethodTable(LOCK *lock)
187 {
188         LOCKMETHODID lockmethodid = LOCK_LOCKMETHOD(*lock);
189
190         Assert(0 < lockmethodid && lockmethodid < NumLockMethods);
191         return LockMethods[lockmethodid];
192 }
193
194
195 /*
196  * LockMethodInit -- initialize the lock table's lock type
197  *              structures
198  *
199  * Notes: just copying.  Should only be called once.
200  */
201 static void
202 LockMethodInit(LockMethod lockMethodTable,
203                            const LOCKMASK *conflictsP,
204                            int numModes)
205 {
206         int                     i;
207
208         lockMethodTable->numLockModes = numModes;
209         /* copies useless zero element as well as the N lockmodes */
210         for (i = 0; i <= numModes; i++)
211                 lockMethodTable->conflictTab[i] = conflictsP[i];
212 }
213
214 /*
215  * LockMethodTableInit -- initialize a lock table structure
216  *
217  * NOTE: data structures allocated here are allocated permanently, using
218  * TopMemoryContext and shared memory.  We don't ever release them anyway,
219  * and in normal multi-backend operation the lock table structures set up
220  * by the postmaster are inherited by each backend, so they must be in
221  * TopMemoryContext.
222  */
223 LOCKMETHODID
224 LockMethodTableInit(const char *tabName,
225                                         const LOCKMASK *conflictsP,
226                                         int numModes,
227                                         int maxBackends)
228 {
229         LockMethod      newLockMethod;
230         LOCKMETHODID lockmethodid;
231         char       *shmemName;
232         HASHCTL         info;
233         int                     hash_flags;
234         bool            found;
235         long            init_table_size,
236                                 max_table_size;
237
238         if (numModes >= MAX_LOCKMODES)
239                 elog(ERROR, "too many lock types %d (limit is %d)",
240                          numModes, MAX_LOCKMODES - 1);
241
242         /* Compute init/max size to request for lock hashtables */
243         max_table_size = NLOCKENTS(maxBackends);
244         init_table_size = max_table_size / 2;
245
246         /* Allocate a string for the shmem index table lookups. */
247         /* This is just temp space in this routine, so palloc is OK. */
248         shmemName = (char *) palloc(strlen(tabName) + 32);
249
250         /* each lock table has a header in shared memory */
251         sprintf(shmemName, "%s (lock method table)", tabName);
252         newLockMethod = (LockMethod)
253                 ShmemInitStruct(shmemName, sizeof(LockMethodData), &found);
254
255         if (!newLockMethod)
256                 elog(FATAL, "could not initialize lock table \"%s\"", tabName);
257
258         /*
259          * we're first - initialize
260          */
261         if (!found)
262         {
263                 MemSet(newLockMethod, 0, sizeof(LockMethodData));
264                 newLockMethod->masterLock = LockMgrLock;
265                 LockMethodInit(newLockMethod, conflictsP, numModes);
266         }
267
268         /*
269          * other modules refer to the lock table by a lockmethod ID
270          */
271         Assert(NumLockMethods < MAX_LOCK_METHODS);
272         lockmethodid = NumLockMethods++;
273         LockMethods[lockmethodid] = newLockMethod;
274
275         /*
276          * allocate a hash table for LOCK structs.      This is used to store
277          * per-locked-object information.
278          */
279         MemSet(&info, 0, sizeof(info));
280         info.keysize = sizeof(LOCKTAG);
281         info.entrysize = sizeof(LOCK);
282         info.hash = tag_hash;
283         hash_flags = (HASH_ELEM | HASH_FUNCTION);
284
285         sprintf(shmemName, "%s (lock hash)", tabName);
286         LockMethodLockHash[lockmethodid] = ShmemInitHash(shmemName,
287                                                                                                          init_table_size,
288                                                                                                          max_table_size,
289                                                                                                          &info,
290                                                                                                          hash_flags);
291
292         if (!LockMethodLockHash[lockmethodid])
293                 elog(FATAL, "could not initialize lock table \"%s\"", tabName);
294
295         /*
296          * allocate a hash table for PROCLOCK structs.  This is used to store
297          * per-lock-holder information.
298          */
299         info.keysize = sizeof(PROCLOCKTAG);
300         info.entrysize = sizeof(PROCLOCK);
301         info.hash = tag_hash;
302         hash_flags = (HASH_ELEM | HASH_FUNCTION);
303
304         sprintf(shmemName, "%s (proclock hash)", tabName);
305         LockMethodProcLockHash[lockmethodid] = ShmemInitHash(shmemName,
306                                                                                                                  init_table_size,
307                                                                                                                  max_table_size,
308                                                                                                                  &info,
309                                                                                                                  hash_flags);
310
311         if (!LockMethodProcLockHash[lockmethodid])
312                 elog(FATAL, "could not initialize lock table \"%s\"", tabName);
313
314         /*
315          * allocate a non-shared hash table for LOCALLOCK structs.      This is
316          * used to store lock counts and resource owner information.
317          *
318          * The non-shared table could already exist in this process (this occurs
319          * when the postmaster is recreating shared memory after a backend
320          * crash). If so, delete and recreate it.  (We could simply leave it,
321          * since it ought to be empty in the postmaster, but for safety let's
322          * zap it.)
323          */
324         if (LockMethodLocalHash[lockmethodid])
325                 hash_destroy(LockMethodLocalHash[lockmethodid]);
326
327         info.keysize = sizeof(LOCALLOCKTAG);
328         info.entrysize = sizeof(LOCALLOCK);
329         info.hash = tag_hash;
330         hash_flags = (HASH_ELEM | HASH_FUNCTION);
331
332         sprintf(shmemName, "%s (locallock hash)", tabName);
333         LockMethodLocalHash[lockmethodid] = hash_create(shmemName,
334                                                                                                         128,
335                                                                                                         &info,
336                                                                                                         hash_flags);
337
338         pfree(shmemName);
339
340         return lockmethodid;
341 }
342
343 /*
344  * LockMethodTableRename -- allocate another lockmethod ID to the same
345  *              lock table.
346  *
347  * NOTES: Both the lock module and the lock chain (lchain.c)
348  *              module use table id's to distinguish between different
349  *              kinds of locks.  Short term and long term locks look
350  *              the same to the lock table, but are handled differently
351  *              by the lock chain manager.      This function allows the
352  *              client to use different lockmethods when acquiring/releasing
353  *              short term and long term locks, yet store them all in one hashtable.
354  */
355
356 LOCKMETHODID
357 LockMethodTableRename(LOCKMETHODID lockmethodid)
358 {
359         LOCKMETHODID newLockMethodId;
360
361         if (NumLockMethods >= MAX_LOCK_METHODS)
362                 return INVALID_LOCKMETHOD;
363         if (LockMethods[lockmethodid] == INVALID_LOCKMETHOD)
364                 return INVALID_LOCKMETHOD;
365
366         /* other modules refer to the lock table by a lockmethod ID */
367         newLockMethodId = NumLockMethods;
368         NumLockMethods++;
369
370         LockMethods[newLockMethodId] = LockMethods[lockmethodid];
371         LockMethodLockHash[newLockMethodId] = LockMethodLockHash[lockmethodid];
372         LockMethodProcLockHash[newLockMethodId] = LockMethodProcLockHash[lockmethodid];
373         LockMethodLocalHash[newLockMethodId] = LockMethodLocalHash[lockmethodid];
374
375         return newLockMethodId;
376 }
377
378 /*
379  * LockAcquire -- Check for lock conflicts, sleep if conflict found,
380  *              set lock if/when no conflicts.
381  *
382  * Returns: TRUE if lock was acquired, FALSE otherwise.  Note that
383  *              a FALSE return is to be expected if dontWait is TRUE;
384  *              but if dontWait is FALSE, only a parameter error can cause
385  *              a FALSE return.  (XXX probably we should just ereport on parameter
386  *              errors, instead of conflating this with failure to acquire lock?)
387  *
388  * Side Effects: The lock is acquired and recorded in lock tables.
389  *
390  * NOTE: if we wait for the lock, there is no way to abort the wait
391  * short of aborting the transaction.
392  *
393  *
394  * Note on User Locks:
395  *
396  *              User locks are handled totally on the application side as
397  *              long term cooperative locks which extend beyond the normal
398  *              transaction boundaries.  Their purpose is to indicate to an
399  *              application that someone is `working' on an item.  So it is
400  *              possible to put an user lock on a tuple's oid, retrieve the
401  *              tuple, work on it for an hour and then update it and remove
402  *              the lock.  While the lock is active other clients can still
403  *              read and write the tuple but they can be aware that it has
404  *              been locked at the application level by someone.
405  *              User locks use lock tags made of an uint16 and an uint32, for
406  *              example 0 and a tuple oid, or any other arbitrary pair of
407  *              numbers following a convention established by the application.
408  *              In this sense tags don't refer to tuples or database entities.
409  *              User locks and normal locks are completely orthogonal and
410  *              they don't interfere with each other, so it is possible
411  *              to acquire a normal lock on an user-locked tuple or user-lock
412  *              a tuple for which a normal write lock already exists.
413  *              User locks are always non blocking, therefore they are never
414  *              acquired if already held by another process.  They must be
415  *              released explicitly by the application but they are released
416  *              automatically when a backend terminates.
417  *              They are indicated by a lockmethod 2 which is an alias for the
418  *              normal lock table, and are distinguished from normal locks
419  *              by the following differences:
420  *
421  *                                                                              normal lock             user lock
422  *
423  *              lockmethodid                                    1                               2
424  *              tag.dbId                                                database oid    database oid
425  *              tag.relId                                               rel oid or 0    0
426  *              tag.objId                                               block id                lock id2
427  *                                                                              or xact id
428  *              tag.offnum                                              0                               lock id1
429  *              proclock.xid                                    xid or 0                0
430  *              persistence                                             transaction             user or backend
431  *                                                                              or backend
432  *
433  *              The lockmode parameter can have the same values for normal locks
434  *              although probably only WRITE_LOCK can have some practical use.
435  *
436  *                                                                                                              DZ - 22 Nov 1997
437  */
438
439 bool
440 LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
441                         TransactionId xid, LOCKMODE lockmode, bool dontWait)
442 {
443         LOCALLOCKTAG localtag;
444         LOCALLOCK  *locallock;
445         LOCK       *lock;
446         PROCLOCK   *proclock;
447         PROCLOCKTAG proclocktag;
448         bool            found;
449         ResourceOwner owner;
450         LWLockId        masterLock;
451         LockMethod      lockMethodTable;
452         int                     status;
453         int                     myHolding[MAX_LOCKMODES];
454         int                     i;
455
456 #ifdef LOCK_DEBUG
457         if (lockmethodid == USER_LOCKMETHOD && Trace_userlocks)
458                 elog(LOG, "LockAcquire: user lock [%u] %s",
459                          locktag->objId.blkno, lock_mode_names[lockmode]);
460 #endif
461
462         /* ???????? This must be changed when short term locks will be used */
463         locktag->lockmethodid = lockmethodid;
464
465         Assert(lockmethodid < NumLockMethods);
466         lockMethodTable = LockMethods[lockmethodid];
467         if (!lockMethodTable)
468         {
469                 elog(WARNING, "bad lock table id: %d", lockmethodid);
470                 return FALSE;
471         }
472
473         /* Session locks and user locks are not transactional */
474         if (xid != InvalidTransactionId &&
475                 lockmethodid == DEFAULT_LOCKMETHOD)
476                 owner = CurrentResourceOwner;
477         else
478                 owner = NULL;
479
480         /*
481          * Find or create a LOCALLOCK entry for this lock and lockmode
482          */
483         MemSet(&localtag, 0, sizeof(localtag));         /* must clear padding */
484         localtag.lock = *locktag;
485         localtag.xid = xid;
486         localtag.mode = lockmode;
487
488         locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash[lockmethodid],
489                                                                                   (void *) &localtag,
490                                                                                   HASH_ENTER, &found);
491         if (!locallock)
492                 ereport(ERROR,
493                                 (errcode(ERRCODE_OUT_OF_MEMORY),
494                                  errmsg("out of memory")));
495
496         /*
497          * if it's a new locallock object, initialize it
498          */
499         if (!found)
500         {
501                 locallock->lock = NULL;
502                 locallock->proclock = NULL;
503                 locallock->nLocks = 0;
504                 locallock->numLockOwners = 0;
505                 locallock->maxLockOwners = 8;
506                 locallock->lockOwners = NULL;
507                 locallock->lockOwners = (LOCALLOCKOWNER *)
508                         MemoryContextAlloc(TopMemoryContext,
509                                           locallock->maxLockOwners * sizeof(LOCALLOCKOWNER));
510         }
511         else
512         {
513                 /* Make sure there will be room to remember the lock */
514                 if (locallock->numLockOwners >= locallock->maxLockOwners)
515                 {
516                         int                     newsize = locallock->maxLockOwners * 2;
517
518                         locallock->lockOwners = (LOCALLOCKOWNER *)
519                                 repalloc(locallock->lockOwners,
520                                                  newsize * sizeof(LOCALLOCKOWNER));
521                         locallock->maxLockOwners = newsize;
522                 }
523         }
524
525         /*
526          * If we already hold the lock, we can just increase the count
527          * locally.
528          */
529         if (locallock->nLocks > 0)
530         {
531                 GrantLockLocal(locallock, owner);
532                 return TRUE;
533         }
534
535         /*
536          * Otherwise we've got to mess with the shared lock table.
537          */
538         masterLock = lockMethodTable->masterLock;
539
540         LWLockAcquire(masterLock, LW_EXCLUSIVE);
541
542         /*
543          * Find or create a lock with this tag.
544          *
545          * Note: if the locallock object already existed, it might have a pointer
546          * to the lock already ... but we probably should not assume that that
547          * pointer is valid, since a lock object with no locks can go away
548          * anytime.
549          */
550         lock = (LOCK *) hash_search(LockMethodLockHash[lockmethodid],
551                                                                 (void *) locktag,
552                                                                 HASH_ENTER, &found);
553         if (!lock)
554         {
555                 LWLockRelease(masterLock);
556                 ereport(ERROR,
557                                 (errcode(ERRCODE_OUT_OF_MEMORY),
558                                  errmsg("out of shared memory"),
559                 errhint("You may need to increase max_locks_per_transaction.")));
560         }
561         locallock->lock = lock;
562
563         /*
564          * if it's a new lock object, initialize it
565          */
566         if (!found)
567         {
568                 lock->grantMask = 0;
569                 lock->waitMask = 0;
570                 SHMQueueInit(&(lock->procLocks));
571                 ProcQueueInit(&(lock->waitProcs));
572                 lock->nRequested = 0;
573                 lock->nGranted = 0;
574                 MemSet((char *) lock->requested, 0, sizeof(int) * MAX_LOCKMODES);
575                 MemSet((char *) lock->granted, 0, sizeof(int) * MAX_LOCKMODES);
576                 LOCK_PRINT("LockAcquire: new", lock, lockmode);
577         }
578         else
579         {
580                 LOCK_PRINT("LockAcquire: found", lock, lockmode);
581                 Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
582                 Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
583                 Assert(lock->nGranted <= lock->nRequested);
584         }
585
586         /*
587          * Create the hash key for the proclock table.
588          */
589         MemSet(&proclocktag, 0, sizeof(PROCLOCKTAG));           /* must clear padding */
590         proclocktag.lock = MAKE_OFFSET(lock);
591         proclocktag.proc = MAKE_OFFSET(MyProc);
592         TransactionIdStore(xid, &proclocktag.xid);
593
594         /*
595          * Find or create a proclock entry with this tag
596          */
597         proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid],
598                                                                                 (void *) &proclocktag,
599                                                                                 HASH_ENTER, &found);
600         if (!proclock)
601         {
602                 /* Ooops, not enough shmem for the proclock */
603                 if (lock->nRequested == 0)
604                 {
605                         /*
606                          * There are no other requestors of this lock, so garbage-collect
607                          * the lock object.  We *must* do this to avoid a permanent leak
608                          * of shared memory, because there won't be anything to cause
609                          * anyone to release the lock object later.
610                          */
611                         Assert(SHMQueueEmpty(&(lock->procLocks)));
612                         lock = (LOCK *) hash_search(LockMethodLockHash[lockmethodid],
613                                                                                 (void *) &(lock->tag),
614                                                                                 HASH_REMOVE, NULL);
615                 }
616                 LWLockRelease(masterLock);
617                 if (!lock)                              /* hash remove failed? */
618                         elog(WARNING, "lock table corrupted");
619                 ereport(ERROR,
620                                 (errcode(ERRCODE_OUT_OF_MEMORY),
621                                  errmsg("out of shared memory"),
622                 errhint("You may need to increase max_locks_per_transaction.")));
623         }
624         locallock->proclock = proclock;
625
626         /*
627          * If new, initialize the new entry
628          */
629         if (!found)
630         {
631                 proclock->holdMask = 0;
632                 /* Add proclock to appropriate lists */
633                 SHMQueueInsertBefore(&lock->procLocks, &proclock->lockLink);
634                 SHMQueueInsertBefore(&MyProc->procLocks, &proclock->procLink);
635                 PROCLOCK_PRINT("LockAcquire: new", proclock);
636         }
637         else
638         {
639                 PROCLOCK_PRINT("LockAcquire: found", proclock);
640                 Assert((proclock->holdMask & ~lock->grantMask) == 0);
641
642 #ifdef CHECK_DEADLOCK_RISK
643
644                 /*
645                  * Issue warning if we already hold a lower-level lock on this
646                  * object and do not hold a lock of the requested level or higher.
647                  * This indicates a deadlock-prone coding practice (eg, we'd have
648                  * a deadlock if another backend were following the same code path
649                  * at about the same time).
650                  *
651                  * This is not enabled by default, because it may generate log
652                  * entries about user-level coding practices that are in fact safe
653                  * in context. It can be enabled to help find system-level
654                  * problems.
655                  *
656                  * XXX Doing numeric comparison on the lockmodes is a hack; it'd be
657                  * better to use a table.  For now, though, this works.
658                  */
659                 for (i = lockMethodTable->numLockModes; i > 0; i--)
660                 {
661                         if (proclock->holdMask & LOCKBIT_ON(i))
662                         {
663                                 if (i >= (int) lockmode)
664                                         break;          /* safe: we have a lock >= req level */
665                                 elog(LOG, "deadlock risk: raising lock level"
666                                          " from %s to %s on object %u/%u/%u",
667                                          lock_mode_names[i], lock_mode_names[lockmode],
668                                  lock->tag.relId, lock->tag.dbId, lock->tag.objId.blkno);
669                                 break;
670                         }
671                 }
672 #endif   /* CHECK_DEADLOCK_RISK */
673         }
674
675         /*
676          * lock->nRequested and lock->requested[] count the total number of
677          * requests, whether granted or waiting, so increment those
678          * immediately. The other counts don't increment till we get the lock.
679          */
680         lock->nRequested++;
681         lock->requested[lockmode]++;
682         Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
683
684         /*
685          * If this process (under any XID) is a holder of the lock, just grant
686          * myself another one without blocking.
687          */
688         LockCountMyLocks(proclock->tag.lock, MyProc, myHolding);
689         if (myHolding[lockmode] > 0)
690         {
691                 GrantLock(lock, proclock, lockmode);
692                 GrantLockLocal(locallock, owner);
693                 PROCLOCK_PRINT("LockAcquire: my other XID owning", proclock);
694                 LWLockRelease(masterLock);
695                 return TRUE;
696         }
697
698         /*
699          * If lock requested conflicts with locks requested by waiters, must
700          * join wait queue.  Otherwise, check for conflict with already-held
701          * locks.  (That's last because most complex check.)
702          */
703         if (lockMethodTable->conflictTab[lockmode] & lock->waitMask)
704                 status = STATUS_FOUND;
705         else
706                 status = LockCheckConflicts(lockMethodTable, lockmode,
707                                                                         lock, proclock,
708                                                                         MyProc, myHolding);
709
710         if (status == STATUS_OK)
711         {
712                 /* No conflict with held or previously requested locks */
713                 GrantLock(lock, proclock, lockmode);
714                 GrantLockLocal(locallock, owner);
715         }
716         else
717         {
718                 Assert(status == STATUS_FOUND);
719
720                 /*
721                  * We can't acquire the lock immediately.  If caller specified no
722                  * blocking, remove useless table entries and return FALSE without
723                  * waiting.
724                  */
725                 if (dontWait)
726                 {
727                         if (proclock->holdMask == 0)
728                         {
729                                 SHMQueueDelete(&proclock->lockLink);
730                                 SHMQueueDelete(&proclock->procLink);
731                                 proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid],
732                                                                                            (void *) &(proclock->tag),
733                                                                                                         HASH_REMOVE, NULL);
734                                 if (!proclock)
735                                         elog(WARNING, "proclock table corrupted");
736                         }
737                         else
738                                 PROCLOCK_PRINT("LockAcquire: NOWAIT", proclock);
739                         lock->nRequested--;
740                         lock->requested[lockmode]--;
741                         LOCK_PRINT("LockAcquire: conditional lock failed", lock, lockmode);
742                         Assert((lock->nRequested > 0) && (lock->requested[lockmode] >= 0));
743                         Assert(lock->nGranted <= lock->nRequested);
744                         LWLockRelease(masterLock);
745                         if (locallock->nLocks == 0)
746                                 RemoveLocalLock(locallock);
747                         return FALSE;
748                 }
749
750                 /*
751                  * Construct bitmask of locks this process holds on this object.
752                  */
753                 {
754                         LOCKMASK        heldLocks = 0;
755
756                         for (i = 1; i <= lockMethodTable->numLockModes; i++)
757                         {
758                                 if (myHolding[i] > 0)
759                                         heldLocks |= LOCKBIT_ON(i);
760                         }
761                         MyProc->heldLocks = heldLocks;
762                 }
763
764                 /*
765                  * Sleep till someone wakes me up.
766                  */
767                 status = WaitOnLock(lockmethodid, locallock, owner);
768
769                 /*
770                  * NOTE: do not do any material change of state between here and
771                  * return.      All required changes in locktable state must have been
772                  * done when the lock was granted to us --- see notes in
773                  * WaitOnLock.
774                  */
775
776                 /*
777                  * Check the proclock entry status, in case something in the ipc
778                  * communication doesn't work correctly.
779                  */
780                 if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
781                 {
782                         PROCLOCK_PRINT("LockAcquire: INCONSISTENT", proclock);
783                         LOCK_PRINT("LockAcquire: INCONSISTENT", lock, lockmode);
784                         /* Should we retry ? */
785                         LWLockRelease(masterLock);
786                         return FALSE;
787                 }
788                 PROCLOCK_PRINT("LockAcquire: granted", proclock);
789                 LOCK_PRINT("LockAcquire: granted", lock, lockmode);
790         }
791
792         LWLockRelease(masterLock);
793
794         return status == STATUS_OK;
795 }
796
797 /*
798  * Subroutine to free a locallock entry
799  */
800 static void
801 RemoveLocalLock(LOCALLOCK *locallock)
802 {
803         LOCKMETHODID lockmethodid = LOCALLOCK_LOCKMETHOD(*locallock);
804
805         pfree(locallock->lockOwners);
806         locallock->lockOwners = NULL;
807         locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash[lockmethodid],
808                                                                                   (void *) &(locallock->tag),
809                                                                                   HASH_REMOVE, NULL);
810         if (!locallock)
811                 elog(WARNING, "locallock table corrupted");
812 }
813
814 /*
815  * LockCheckConflicts -- test whether requested lock conflicts
816  *              with those already granted
817  *
818  * Returns STATUS_FOUND if conflict, STATUS_OK if no conflict.
819  *
820  * NOTES:
821  *              Here's what makes this complicated: one process's locks don't
822  * conflict with one another, even if they are held under different
823  * transaction IDs (eg, session and xact locks do not conflict).
824  * So, we must subtract off our own locks when determining whether the
825  * requested new lock conflicts with those already held.
826  *
827  * The caller can optionally pass the process's total holding counts, if
828  * known.  If NULL is passed then these values will be computed internally.
829  */
830 int
831 LockCheckConflicts(LockMethod lockMethodTable,
832                                    LOCKMODE lockmode,
833                                    LOCK *lock,
834                                    PROCLOCK *proclock,
835                                    PGPROC *proc,
836                                    int *myHolding)              /* myHolding[] array or NULL */
837 {
838         int                     numLockModes = lockMethodTable->numLockModes;
839         LOCKMASK        bitmask;
840         int                     i;
841         int                     localHolding[MAX_LOCKMODES];
842
843         /*
844          * first check for global conflicts: If no locks conflict with my
845          * request, then I get the lock.
846          *
847          * Checking for conflict: lock->grantMask represents the types of
848          * currently held locks.  conflictTable[lockmode] has a bit set for
849          * each type of lock that conflicts with request.       Bitwise compare
850          * tells if there is a conflict.
851          */
852         if (!(lockMethodTable->conflictTab[lockmode] & lock->grantMask))
853         {
854                 PROCLOCK_PRINT("LockCheckConflicts: no conflict", proclock);
855                 return STATUS_OK;
856         }
857
858         /*
859          * Rats.  Something conflicts. But it could still be my own lock.  We
860          * have to construct a conflict mask that does not reflect our own
861          * locks.  Locks held by the current process under another XID also
862          * count as "our own locks".
863          */
864         if (myHolding == NULL)
865         {
866                 /* Caller didn't do calculation of total holding for me */
867                 LockCountMyLocks(proclock->tag.lock, proc, localHolding);
868                 myHolding = localHolding;
869         }
870
871         /* Compute mask of lock types held by other processes */
872         bitmask = 0;
873         for (i = 1; i <= numLockModes; i++)
874         {
875                 if (lock->granted[i] != myHolding[i])
876                         bitmask |= LOCKBIT_ON(i);
877         }
878
879         /*
880          * now check again for conflicts.  'bitmask' describes the types of
881          * locks held by other processes.  If one of these conflicts with the
882          * kind of lock that I want, there is a conflict and I have to sleep.
883          */
884         if (!(lockMethodTable->conflictTab[lockmode] & bitmask))
885         {
886                 /* no conflict. OK to get the lock */
887                 PROCLOCK_PRINT("LockCheckConflicts: resolved", proclock);
888                 return STATUS_OK;
889         }
890
891         PROCLOCK_PRINT("LockCheckConflicts: conflicting", proclock);
892         return STATUS_FOUND;
893 }
894
895 /*
896  * LockCountMyLocks --- Count total number of locks held on a given lockable
897  *              object by a given process (under any transaction ID).
898  *
899  * XXX This could be rather slow if the process holds a large number of locks.
900  * Perhaps it could be sped up if we kept yet a third hashtable of per-
901  * process lock information.  However, for the normal case where a transaction
902  * doesn't hold a large number of locks, keeping such a table would probably
903  * be a net slowdown.
904  */
905 static void
906 LockCountMyLocks(SHMEM_OFFSET lockOffset, PGPROC *proc, int *myHolding)
907 {
908         SHM_QUEUE  *procLocks = &(proc->procLocks);
909         PROCLOCK   *proclock;
910
911         MemSet(myHolding, 0, MAX_LOCKMODES * sizeof(int));
912
913         proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
914                                                                                  offsetof(PROCLOCK, procLink));
915
916         while (proclock)
917         {
918                 if (lockOffset == proclock->tag.lock)
919                 {
920                         LOCKMASK        holdMask = proclock->holdMask;
921                         int                     i;
922
923                         for (i = 1; i < MAX_LOCKMODES; i++)
924                         {
925                                 if (holdMask & LOCKBIT_ON(i))
926                                         myHolding[i]++;
927                         }
928                 }
929
930                 proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->procLink,
931                                                                                    offsetof(PROCLOCK, procLink));
932         }
933 }
934
935 /*
936  * GrantLock -- update the lock and proclock data structures to show
937  *              the lock request has been granted.
938  *
939  * NOTE: if proc was blocked, it also needs to be removed from the wait list
940  * and have its waitLock/waitProcLock fields cleared.  That's not done here.
941  *
942  * NOTE: the lock grant also has to be recorded in the associated LOCALLOCK
943  * table entry; but since we may be awaking some other process, we can't do
944  * that here; it's done by GrantLockLocal, instead.
945  */
946 void
947 GrantLock(LOCK *lock, PROCLOCK *proclock, LOCKMODE lockmode)
948 {
949         lock->nGranted++;
950         lock->granted[lockmode]++;
951         lock->grantMask |= LOCKBIT_ON(lockmode);
952         if (lock->granted[lockmode] == lock->requested[lockmode])
953                 lock->waitMask &= LOCKBIT_OFF(lockmode);
954         proclock->holdMask |= LOCKBIT_ON(lockmode);
955         LOCK_PRINT("GrantLock", lock, lockmode);
956         Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
957         Assert(lock->nGranted <= lock->nRequested);
958 }
959
960 /*
961  * GrantLockLocal -- update the locallock data structures to show
962  *              the lock request has been granted.
963  *
964  * We expect that LockAcquire made sure there is room to add a new
965  * ResourceOwner entry.
966  */
967 static void
968 GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner)
969 {
970         LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
971         int                     i;
972
973         Assert(locallock->numLockOwners < locallock->maxLockOwners);
974         /* Count the total */
975         locallock->nLocks++;
976         /* Count the per-owner lock */
977         for (i = 0; i < locallock->numLockOwners; i++)
978         {
979                 if (lockOwners[i].owner == owner)
980                 {
981                         lockOwners[i].nLocks++;
982                         return;
983                 }
984         }
985         lockOwners[i].owner = owner;
986         lockOwners[i].nLocks = 1;
987         locallock->numLockOwners++;
988 }
989
990 /*
991  * GrantAwaitedLock -- call GrantLockLocal for the lock we are doing
992  *              WaitOnLock on.
993  *
994  * proc.c needs this for the case where we are booted off the lock by
995  * timeout, but discover that someone granted us the lock anyway.
996  *
997  * We could just export GrantLockLocal, but that would require including
998  * resowner.h in lock.h, which creates circularity.
999  */
1000 void
1001 GrantAwaitedLock(void)
1002 {
1003         GrantLockLocal(awaitedLock, awaitedOwner);
1004 }
1005
1006 /*
1007  * WaitOnLock -- wait to acquire a lock
1008  *
1009  * Caller must have set MyProc->heldLocks to reflect locks already held
1010  * on the lockable object by this process (under all XIDs).
1011  *
1012  * The locktable's masterLock must be held at entry.
1013  */
1014 static int
1015 WaitOnLock(LOCKMETHODID lockmethodid, LOCALLOCK *locallock,
1016                    ResourceOwner owner)
1017 {
1018         LockMethod      lockMethodTable = LockMethods[lockmethodid];
1019         char       *new_status,
1020                            *old_status;
1021
1022         Assert(lockmethodid < NumLockMethods);
1023
1024         LOCK_PRINT("WaitOnLock: sleeping on lock",
1025                            locallock->lock, locallock->tag.mode);
1026
1027         old_status = pstrdup(get_ps_display());
1028         new_status = (char *) palloc(strlen(old_status) + 10);
1029         strcpy(new_status, old_status);
1030         strcat(new_status, " waiting");
1031         set_ps_display(new_status);
1032
1033         awaitedLock = locallock;
1034         awaitedOwner = owner;
1035
1036         /*
1037          * NOTE: Think not to put any shared-state cleanup after the call to
1038          * ProcSleep, in either the normal or failure path.  The lock state
1039          * must be fully set by the lock grantor, or by CheckDeadLock if we
1040          * give up waiting for the lock.  This is necessary because of the
1041          * possibility that a cancel/die interrupt will interrupt ProcSleep
1042          * after someone else grants us the lock, but before we've noticed it.
1043          * Hence, after granting, the locktable state must fully reflect the
1044          * fact that we own the lock; we can't do additional work on return.
1045          * Contrariwise, if we fail, any cleanup must happen in xact abort
1046          * processing, not here, to ensure it will also happen in the
1047          * cancel/die case.
1048          */
1049
1050         if (ProcSleep(lockMethodTable,
1051                                   locallock->tag.mode,
1052                                   locallock->lock,
1053                                   locallock->proclock) != STATUS_OK)
1054         {
1055                 /*
1056                  * We failed as a result of a deadlock, see CheckDeadLock(). Quit
1057                  * now.  Removal of the proclock and lock objects, if no longer
1058                  * needed, will happen in xact cleanup (see above for motivation).
1059                  */
1060                 awaitedLock = NULL;
1061                 LOCK_PRINT("WaitOnLock: aborting on lock",
1062                                    locallock->lock, locallock->tag.mode);
1063                 LWLockRelease(lockMethodTable->masterLock);
1064
1065                 /*
1066                  * Now that we aren't holding the LockMgrLock, we can give an
1067                  * error report including details about the detected deadlock.
1068                  */
1069                 DeadLockReport();
1070                 /* not reached */
1071         }
1072
1073         awaitedLock = NULL;
1074
1075         set_ps_display(old_status);
1076         pfree(old_status);
1077         pfree(new_status);
1078
1079         LOCK_PRINT("WaitOnLock: wakeup on lock",
1080                            locallock->lock, locallock->tag.mode);
1081         return STATUS_OK;
1082 }
1083
1084 /*
1085  * Remove a proc from the wait-queue it is on
1086  * (caller must know it is on one).
1087  *
1088  * Locktable lock must be held by caller.
1089  *
1090  * NB: this does not remove the process' proclock object, nor the lock object,
1091  * even though their counts might now have gone to zero.  That will happen
1092  * during a subsequent LockReleaseAll call, which we expect will happen
1093  * during transaction cleanup.  (Removal of a proc from its wait queue by
1094  * this routine can only happen if we are aborting the transaction.)
1095  */
1096 void
1097 RemoveFromWaitQueue(PGPROC *proc)
1098 {
1099         LOCK       *waitLock = proc->waitLock;
1100         LOCKMODE        lockmode = proc->waitLockMode;
1101
1102         /* Make sure proc is waiting */
1103         Assert(proc->links.next != INVALID_OFFSET);
1104         Assert(waitLock);
1105         Assert(waitLock->waitProcs.size > 0);
1106
1107         /* Remove proc from lock's wait queue */
1108         SHMQueueDelete(&(proc->links));
1109         waitLock->waitProcs.size--;
1110
1111         /* Undo increments of request counts by waiting process */
1112         Assert(waitLock->nRequested > 0);
1113         Assert(waitLock->nRequested > proc->waitLock->nGranted);
1114         waitLock->nRequested--;
1115         Assert(waitLock->requested[lockmode] > 0);
1116         waitLock->requested[lockmode]--;
1117         /* don't forget to clear waitMask bit if appropriate */
1118         if (waitLock->granted[lockmode] == waitLock->requested[lockmode])
1119                 waitLock->waitMask &= LOCKBIT_OFF(lockmode);
1120
1121         /* Clean up the proc's own state */
1122         proc->waitLock = NULL;
1123         proc->waitProcLock = NULL;
1124
1125         /* See if any other waiters for the lock can be woken up now */
1126         ProcLockWakeup(GetLocksMethodTable(waitLock), waitLock);
1127 }
1128
1129 /*
1130  * LockRelease -- look up 'locktag' in lock table 'lockmethodid' and
1131  *              release one 'lockmode' lock on it.
1132  *
1133  * Side Effects: find any waiting processes that are now wakable,
1134  *              grant them their requested locks and awaken them.
1135  *              (We have to grant the lock here to avoid a race between
1136  *              the waking process and any new process to
1137  *              come along and request the lock.)
1138  */
1139 bool
1140 LockRelease(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
1141                         TransactionId xid, LOCKMODE lockmode)
1142 {
1143         LOCALLOCKTAG localtag;
1144         LOCALLOCK  *locallock;
1145         LOCK       *lock;
1146         PROCLOCK   *proclock;
1147         LWLockId        masterLock;
1148         LockMethod      lockMethodTable;
1149         bool            wakeupNeeded = false;
1150
1151 #ifdef LOCK_DEBUG
1152         if (lockmethodid == USER_LOCKMETHOD && Trace_userlocks)
1153                 elog(LOG, "LockRelease: user lock tag [%u] %d", locktag->objId.blkno, lockmode);
1154 #endif
1155
1156         /* ???????? This must be changed when short term locks will be used */
1157         locktag->lockmethodid = lockmethodid;
1158
1159         Assert(lockmethodid < NumLockMethods);
1160         lockMethodTable = LockMethods[lockmethodid];
1161         if (!lockMethodTable)
1162         {
1163                 elog(WARNING, "lockMethodTable is null in LockRelease");
1164                 return FALSE;
1165         }
1166
1167         /*
1168          * Find the LOCALLOCK entry for this lock and lockmode
1169          */
1170         MemSet(&localtag, 0, sizeof(localtag));         /* must clear padding */
1171         localtag.lock = *locktag;
1172         localtag.xid = xid;
1173         localtag.mode = lockmode;
1174
1175         locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash[lockmethodid],
1176                                                                                   (void *) &localtag,
1177                                                                                   HASH_FIND, NULL);
1178
1179         /*
1180          * let the caller print its own error message, too. Do not
1181          * ereport(ERROR).
1182          */
1183         if (!locallock || locallock->nLocks <= 0)
1184         {
1185                 elog(WARNING, "you don't own a lock of type %s",
1186                          lock_mode_names[lockmode]);
1187                 return FALSE;
1188         }
1189
1190         /*
1191          * Decrease the count for the resource owner.
1192          */
1193         {
1194                 LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
1195                 ResourceOwner owner;
1196                 int                     i;
1197
1198                 /* Session locks and user locks are not transactional */
1199                 if (xid != InvalidTransactionId &&
1200                         lockmethodid == DEFAULT_LOCKMETHOD)
1201                         owner = CurrentResourceOwner;
1202                 else
1203                         owner = NULL;
1204
1205                 for (i = locallock->numLockOwners - 1; i >= 0; i--)
1206                 {
1207                         if (lockOwners[i].owner == owner)
1208                         {
1209                                 Assert(lockOwners[i].nLocks > 0);
1210                                 if (--lockOwners[i].nLocks == 0)
1211                                 {
1212                                         /* compact out unused slot */
1213                                         locallock->numLockOwners--;
1214                                         if (i < locallock->numLockOwners)
1215                                                 lockOwners[i] = lockOwners[locallock->numLockOwners];
1216                                 }
1217                                 break;
1218                         }
1219                 }
1220                 if (i < 0)
1221                 {
1222                         /* don't release a lock belonging to another owner */
1223                         elog(WARNING, "you don't own a lock of type %s",
1224                                  lock_mode_names[lockmode]);
1225                         return FALSE;
1226                 }
1227         }
1228
1229         /*
1230          * Decrease the total local count.      If we're still holding the lock,
1231          * we're done.
1232          */
1233         locallock->nLocks--;
1234
1235         if (locallock->nLocks > 0)
1236                 return TRUE;
1237
1238         /*
1239          * Otherwise we've got to mess with the shared lock table.
1240          */
1241         masterLock = lockMethodTable->masterLock;
1242
1243         LWLockAcquire(masterLock, LW_EXCLUSIVE);
1244
1245         /*
1246          * We don't need to re-find the lock or proclock, since we kept their
1247          * addresses in the locallock table, and they couldn't have been
1248          * removed while we were holding a lock on them.
1249          */
1250         lock = locallock->lock;
1251         LOCK_PRINT("LockRelease: found", lock, lockmode);
1252         proclock = locallock->proclock;
1253         PROCLOCK_PRINT("LockRelease: found", proclock);
1254
1255         /*
1256          * Double-check that we are actually holding a lock of the type we
1257          * want to release.
1258          */
1259         if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
1260         {
1261                 PROCLOCK_PRINT("LockRelease: WRONGTYPE", proclock);
1262                 LWLockRelease(masterLock);
1263                 elog(WARNING, "you don't own a lock of type %s",
1264                          lock_mode_names[lockmode]);
1265                 RemoveLocalLock(locallock);
1266                 return FALSE;
1267         }
1268         Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
1269         Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
1270         Assert(lock->nGranted <= lock->nRequested);
1271
1272         /*
1273          * fix the general lock stats
1274          */
1275         lock->nRequested--;
1276         lock->requested[lockmode]--;
1277         lock->nGranted--;
1278         lock->granted[lockmode]--;
1279
1280         if (lock->granted[lockmode] == 0)
1281         {
1282                 /* change the conflict mask.  No more of this lock type. */
1283                 lock->grantMask &= LOCKBIT_OFF(lockmode);
1284         }
1285
1286         LOCK_PRINT("LockRelease: updated", lock, lockmode);
1287         Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
1288         Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
1289         Assert(lock->nGranted <= lock->nRequested);
1290
1291         /*
1292          * We need only run ProcLockWakeup if the released lock conflicts with
1293          * at least one of the lock types requested by waiter(s).  Otherwise
1294          * whatever conflict made them wait must still exist.  NOTE: before
1295          * MVCC, we could skip wakeup if lock->granted[lockmode] was still
1296          * positive. But that's not true anymore, because the remaining
1297          * granted locks might belong to some waiter, who could now be
1298          * awakened because he doesn't conflict with his own locks.
1299          */
1300         if (lockMethodTable->conflictTab[lockmode] & lock->waitMask)
1301                 wakeupNeeded = true;
1302
1303         /*
1304          * Now fix the per-proclock state.
1305          */
1306         proclock->holdMask &= LOCKBIT_OFF(lockmode);
1307         PROCLOCK_PRINT("LockRelease: updated", proclock);
1308
1309         /*
1310          * If this was my last hold on this lock, delete my entry in the
1311          * proclock table.
1312          */
1313         if (proclock->holdMask == 0)
1314         {
1315                 PROCLOCK_PRINT("LockRelease: deleting", proclock);
1316                 SHMQueueDelete(&proclock->lockLink);
1317                 SHMQueueDelete(&proclock->procLink);
1318                 proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid],
1319                                                                                         (void *) &(proclock->tag),
1320                                                                                         HASH_REMOVE, NULL);
1321                 if (!proclock)
1322                 {
1323                         LWLockRelease(masterLock);
1324                         elog(WARNING, "proclock table corrupted");
1325                         RemoveLocalLock(locallock);
1326                         return FALSE;
1327                 }
1328         }
1329
1330         if (lock->nRequested == 0)
1331         {
1332                 /*
1333                  * We've just released the last lock, so garbage-collect the lock
1334                  * object.
1335                  */
1336                 Assert(SHMQueueEmpty(&(lock->procLocks)));
1337                 lock = (LOCK *) hash_search(LockMethodLockHash[lockmethodid],
1338                                                                         (void *) &(lock->tag),
1339                                                                         HASH_REMOVE, NULL);
1340                 if (!lock)
1341                 {
1342                         LWLockRelease(masterLock);
1343                         elog(WARNING, "lock table corrupted");
1344                         RemoveLocalLock(locallock);
1345                         return FALSE;
1346                 }
1347         }
1348         else
1349         {
1350                 /*
1351                  * Wake up waiters if needed.
1352                  */
1353                 if (wakeupNeeded)
1354                         ProcLockWakeup(lockMethodTable, lock);
1355         }
1356
1357         LWLockRelease(masterLock);
1358
1359         RemoveLocalLock(locallock);
1360         return TRUE;
1361 }
1362
1363 /*
1364  * LockReleaseAll -- Release all locks of the specified lock method that
1365  *              are held by the current process.
1366  *
1367  * Well, not necessarily *all* locks.  The available behaviors are:
1368  *
1369  * allxids == true: release all locks regardless of transaction
1370  * affiliation.
1371  *
1372  * allxids == false: release all locks with Xid != 0
1373  * (zero is the Xid used for "session" locks).
1374  */
1375 bool
1376 LockReleaseAll(LOCKMETHODID lockmethodid, bool allxids)
1377 {
1378         HASH_SEQ_STATUS status;
1379         SHM_QUEUE  *procLocks = &(MyProc->procLocks);
1380         LWLockId        masterLock;
1381         LockMethod      lockMethodTable;
1382         int                     i,
1383                                 numLockModes;
1384         LOCALLOCK  *locallock;
1385         PROCLOCK   *proclock;
1386         LOCK       *lock;
1387
1388 #ifdef LOCK_DEBUG
1389         if (lockmethodid == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
1390                 elog(LOG, "LockReleaseAll: lockmethod=%d", lockmethodid);
1391 #endif
1392
1393         Assert(lockmethodid < NumLockMethods);
1394         lockMethodTable = LockMethods[lockmethodid];
1395         if (!lockMethodTable)
1396         {
1397                 elog(WARNING, "bad lock method: %d", lockmethodid);
1398                 return FALSE;
1399         }
1400
1401         numLockModes = lockMethodTable->numLockModes;
1402         masterLock = lockMethodTable->masterLock;
1403
1404         /*
1405          * First we run through the locallock table and get rid of unwanted
1406          * entries, then we scan the process's proclocks and get rid of those.
1407          * We do this separately because we may have multiple locallock
1408          * entries pointing to the same proclock, and we daren't end up with
1409          * any dangling pointers.
1410          */
1411         hash_seq_init(&status, LockMethodLocalHash[lockmethodid]);
1412
1413         while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
1414         {
1415                 if (locallock->proclock == NULL || locallock->lock == NULL)
1416                 {
1417                         /*
1418                          * We must've run out of shared memory while trying to set up
1419                          * this lock.  Just forget the local entry.
1420                          */
1421                         Assert(locallock->nLocks == 0);
1422                         RemoveLocalLock(locallock);
1423                         continue;
1424                 }
1425
1426                 /* Ignore items that are not of the lockmethod to be removed */
1427                 if (LOCALLOCK_LOCKMETHOD(*locallock) != lockmethodid)
1428                         continue;
1429
1430                 /*
1431                  * Ignore locks with Xid=0 unless we are asked to release all
1432                  * locks
1433                  */
1434                 if (TransactionIdEquals(locallock->tag.xid, InvalidTransactionId)
1435                         && !allxids)
1436                         continue;
1437
1438                 RemoveLocalLock(locallock);
1439         }
1440
1441         LWLockAcquire(masterLock, LW_EXCLUSIVE);
1442
1443         proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
1444                                                                                  offsetof(PROCLOCK, procLink));
1445
1446         while (proclock)
1447         {
1448                 bool            wakeupNeeded = false;
1449                 PROCLOCK   *nextHolder;
1450
1451                 /* Get link first, since we may unlink/delete this proclock */
1452                 nextHolder = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->procLink,
1453                                                                                    offsetof(PROCLOCK, procLink));
1454
1455                 Assert(proclock->tag.proc == MAKE_OFFSET(MyProc));
1456
1457                 lock = (LOCK *) MAKE_PTR(proclock->tag.lock);
1458
1459                 /* Ignore items that are not of the lockmethod to be removed */
1460                 if (LOCK_LOCKMETHOD(*lock) != lockmethodid)
1461                         goto next_item;
1462
1463                 /*
1464                  * Ignore locks with Xid=0 unless we are asked to release all
1465                  * locks
1466                  */
1467                 if (TransactionIdEquals(proclock->tag.xid, InvalidTransactionId)
1468                         && !allxids)
1469                         goto next_item;
1470
1471                 PROCLOCK_PRINT("LockReleaseAll", proclock);
1472                 LOCK_PRINT("LockReleaseAll", lock, 0);
1473                 Assert(lock->nRequested >= 0);
1474                 Assert(lock->nGranted >= 0);
1475                 Assert(lock->nGranted <= lock->nRequested);
1476                 Assert((proclock->holdMask & ~lock->grantMask) == 0);
1477
1478                 /*
1479                  * fix the general lock stats
1480                  */
1481                 if (proclock->holdMask)
1482                 {
1483                         for (i = 1; i <= numLockModes; i++)
1484                         {
1485                                 if (proclock->holdMask & LOCKBIT_ON(i))
1486                                 {
1487                                         lock->requested[i]--;
1488                                         lock->granted[i]--;
1489                                         Assert(lock->requested[i] >= 0 && lock->granted[i] >= 0);
1490                                         if (lock->granted[i] == 0)
1491                                                 lock->grantMask &= LOCKBIT_OFF(i);
1492                                         lock->nRequested--;
1493                                         lock->nGranted--;
1494
1495                                         /*
1496                                          * Read comments in LockRelease
1497                                          */
1498                                         if (!wakeupNeeded &&
1499                                                 lockMethodTable->conflictTab[i] & lock->waitMask)
1500                                                 wakeupNeeded = true;
1501                                 }
1502                         }
1503                 }
1504                 Assert((lock->nRequested >= 0) && (lock->nGranted >= 0));
1505                 Assert(lock->nGranted <= lock->nRequested);
1506                 LOCK_PRINT("LockReleaseAll: updated", lock, 0);
1507
1508                 PROCLOCK_PRINT("LockReleaseAll: deleting", proclock);
1509
1510                 /*
1511                  * Remove the proclock entry from the linked lists
1512                  */
1513                 SHMQueueDelete(&proclock->lockLink);
1514                 SHMQueueDelete(&proclock->procLink);
1515
1516                 /*
1517                  * remove the proclock entry from the hashtable
1518                  */
1519                 proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid],
1520                                                                                         (void *) &(proclock->tag),
1521                                                                                         HASH_REMOVE,
1522                                                                                         NULL);
1523                 if (!proclock)
1524                 {
1525                         LWLockRelease(masterLock);
1526                         elog(WARNING, "proclock table corrupted");
1527                         return FALSE;
1528                 }
1529
1530                 if (lock->nRequested == 0)
1531                 {
1532                         /*
1533                          * We've just released the last lock, so garbage-collect the
1534                          * lock object.
1535                          */
1536                         LOCK_PRINT("LockReleaseAll: deleting", lock, 0);
1537                         Assert(SHMQueueEmpty(&(lock->procLocks)));
1538                         lock = (LOCK *) hash_search(LockMethodLockHash[lockmethodid],
1539                                                                                 (void *) &(lock->tag),
1540                                                                                 HASH_REMOVE, NULL);
1541                         if (!lock)
1542                         {
1543                                 LWLockRelease(masterLock);
1544                                 elog(WARNING, "lock table corrupted");
1545                                 return FALSE;
1546                         }
1547                 }
1548                 else if (wakeupNeeded)
1549                         ProcLockWakeup(lockMethodTable, lock);
1550
1551 next_item:
1552                 proclock = nextHolder;
1553         }
1554
1555         LWLockRelease(masterLock);
1556
1557 #ifdef LOCK_DEBUG
1558         if (lockmethodid == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
1559                 elog(LOG, "LockReleaseAll done");
1560 #endif
1561
1562         return TRUE;
1563 }
1564
1565 /*
1566  * LockReleaseCurrentOwner
1567  *              Release all locks belonging to CurrentResourceOwner
1568  *
1569  * Only DEFAULT_LOCKMETHOD locks can belong to a resource owner.
1570  */
1571 void
1572 LockReleaseCurrentOwner(void)
1573 {
1574         HASH_SEQ_STATUS status;
1575         LOCALLOCK  *locallock;
1576         LOCALLOCKOWNER *lockOwners;
1577         int                     i;
1578
1579         hash_seq_init(&status, LockMethodLocalHash[DEFAULT_LOCKMETHOD]);
1580
1581         while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
1582         {
1583                 /* Ignore items that must be nontransactional */
1584                 if (LOCALLOCK_LOCKMETHOD(*locallock) != DEFAULT_LOCKMETHOD)
1585                         continue;
1586                 if (TransactionIdEquals(locallock->tag.xid, InvalidTransactionId))
1587                         continue;
1588
1589                 /* Scan to see if there are any locks belonging to current owner */
1590                 lockOwners = locallock->lockOwners;
1591                 for (i = locallock->numLockOwners - 1; i >= 0; i--)
1592                 {
1593                         if (lockOwners[i].owner == CurrentResourceOwner)
1594                         {
1595                                 Assert(lockOwners[i].nLocks > 0);
1596                                 if (lockOwners[i].nLocks < locallock->nLocks)
1597                                 {
1598                                         /*
1599                                          * We will still hold this lock after forgetting this
1600                                          * ResourceOwner.
1601                                          */
1602                                         locallock->nLocks -= lockOwners[i].nLocks;
1603                                         /* compact out unused slot */
1604                                         locallock->numLockOwners--;
1605                                         if (i < locallock->numLockOwners)
1606                                                 lockOwners[i] = lockOwners[locallock->numLockOwners];
1607                                 }
1608                                 else
1609                                 {
1610                                         Assert(lockOwners[i].nLocks == locallock->nLocks);
1611                                         /* We want to call LockRelease just once */
1612                                         lockOwners[i].nLocks = 1;
1613                                         locallock->nLocks = 1;
1614                                         if (!LockRelease(DEFAULT_LOCKMETHOD,
1615                                                                          &locallock->tag.lock,
1616                                                                          locallock->tag.xid,
1617                                                                          locallock->tag.mode))
1618                                                 elog(WARNING, "LockReleaseCurrentOwner: failed??");
1619                                 }
1620                                 break;
1621                         }
1622                 }
1623         }
1624 }
1625
1626 /*
1627  * LockReassignCurrentOwner
1628  *              Reassign all locks belonging to CurrentResourceOwner to belong
1629  *              to its parent resource owner
1630  */
1631 void
1632 LockReassignCurrentOwner(void)
1633 {
1634         ResourceOwner parent = ResourceOwnerGetParent(CurrentResourceOwner);
1635         HASH_SEQ_STATUS status;
1636         LOCALLOCK  *locallock;
1637         LOCALLOCKOWNER *lockOwners;
1638
1639         Assert(parent != NULL);
1640
1641         hash_seq_init(&status, LockMethodLocalHash[DEFAULT_LOCKMETHOD]);
1642
1643         while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
1644         {
1645                 int                     i;
1646                 int                     ic = -1;
1647                 int                     ip = -1;
1648
1649                 /* Ignore items that must be nontransactional */
1650                 if (LOCALLOCK_LOCKMETHOD(*locallock) != DEFAULT_LOCKMETHOD)
1651                         continue;
1652                 if (TransactionIdEquals(locallock->tag.xid, InvalidTransactionId))
1653                         continue;
1654
1655                 /*
1656                  * Scan to see if there are any locks belonging to current owner
1657                  * or its parent
1658                  */
1659                 lockOwners = locallock->lockOwners;
1660                 for (i = locallock->numLockOwners - 1; i >= 0; i--)
1661                 {
1662                         if (lockOwners[i].owner == CurrentResourceOwner)
1663                                 ic = i;
1664                         else if (lockOwners[i].owner == parent)
1665                                 ip = i;
1666                 }
1667
1668                 if (ic < 0)
1669                         continue;                       /* no current locks */
1670
1671                 if (ip < 0)
1672                 {
1673                         /* Parent has no slot, so just give it child's slot */
1674                         lockOwners[ic].owner = parent;
1675                 }
1676                 else
1677                 {
1678                         /* Merge child's count with parent's */
1679                         lockOwners[ip].nLocks += lockOwners[ic].nLocks;
1680                         /* compact out unused slot */
1681                         locallock->numLockOwners--;
1682                         if (ic < locallock->numLockOwners)
1683                                 lockOwners[ic] = lockOwners[locallock->numLockOwners];
1684                 }
1685         }
1686 }
1687
1688
1689 /*
1690  * Estimate shared-memory space used for lock tables
1691  */
1692 int
1693 LockShmemSize(int maxBackends)
1694 {
1695         int                     size = 0;
1696         long            max_table_size = NLOCKENTS(maxBackends);
1697
1698         /* lock method headers */
1699         size += MAX_LOCK_METHODS * MAXALIGN(sizeof(LockMethodData));
1700
1701         /* lockHash table */
1702         size += hash_estimate_size(max_table_size, sizeof(LOCK));
1703
1704         /* proclockHash table */
1705         size += hash_estimate_size(max_table_size, sizeof(PROCLOCK));
1706
1707         /*
1708          * Note we count only one pair of hash tables, since the userlocks
1709          * table actually overlays the main one.
1710          *
1711          * Since the lockHash entry count above is only an estimate, add 10%
1712          * safety margin.
1713          */
1714         size += size / 10;
1715
1716         return size;
1717 }
1718
1719 /*
1720  * GetLockStatusData - Return a summary of the lock manager's internal
1721  * status, for use in a user-level reporting function.
1722  *
1723  * The return data consists of an array of PROCLOCK objects, with the
1724  * associated PGPROC and LOCK objects for each.  Note that multiple
1725  * copies of the same PGPROC and/or LOCK objects are likely to appear.
1726  * It is the caller's responsibility to match up duplicates if wanted.
1727  *
1728  * The design goal is to hold the LockMgrLock for as short a time as possible;
1729  * thus, this function simply makes a copy of the necessary data and releases
1730  * the lock, allowing the caller to contemplate and format the data for as
1731  * long as it pleases.
1732  */
1733 LockData *
1734 GetLockStatusData(void)
1735 {
1736         LockData   *data;
1737         HTAB       *proclockTable;
1738         PROCLOCK   *proclock;
1739         HASH_SEQ_STATUS seqstat;
1740         int                     i;
1741
1742         data = (LockData *) palloc(sizeof(LockData));
1743
1744         LWLockAcquire(LockMgrLock, LW_EXCLUSIVE);
1745
1746         proclockTable = LockMethodProcLockHash[DEFAULT_LOCKMETHOD];
1747
1748         data->nelements = i = proclockTable->hctl->nentries;
1749
1750         data->proclockaddrs = (SHMEM_OFFSET *) palloc(sizeof(SHMEM_OFFSET) * i);
1751         data->proclocks = (PROCLOCK *) palloc(sizeof(PROCLOCK) * i);
1752         data->procs = (PGPROC *) palloc(sizeof(PGPROC) * i);
1753         data->locks = (LOCK *) palloc(sizeof(LOCK) * i);
1754
1755         hash_seq_init(&seqstat, proclockTable);
1756
1757         i = 0;
1758         while ((proclock = hash_seq_search(&seqstat)))
1759         {
1760                 PGPROC     *proc = (PGPROC *) MAKE_PTR(proclock->tag.proc);
1761                 LOCK       *lock = (LOCK *) MAKE_PTR(proclock->tag.lock);
1762
1763                 data->proclockaddrs[i] = MAKE_OFFSET(proclock);
1764                 memcpy(&(data->proclocks[i]), proclock, sizeof(PROCLOCK));
1765                 memcpy(&(data->procs[i]), proc, sizeof(PGPROC));
1766                 memcpy(&(data->locks[i]), lock, sizeof(LOCK));
1767
1768                 i++;
1769         }
1770
1771         LWLockRelease(LockMgrLock);
1772
1773         Assert(i == data->nelements);
1774
1775         return data;
1776 }
1777
1778 /* Provide the textual name of any lock mode */
1779 const char *
1780 GetLockmodeName(LOCKMODE mode)
1781 {
1782         Assert(mode <= MAX_LOCKMODES);
1783         return lock_mode_names[mode];
1784 }
1785
1786 #ifdef LOCK_DEBUG
1787 /*
1788  * Dump all locks in the MyProc->procLocks list.
1789  *
1790  * Must have already acquired the masterLock.
1791  */
1792 void
1793 DumpLocks(void)
1794 {
1795         PGPROC     *proc;
1796         SHM_QUEUE  *procLocks;
1797         PROCLOCK   *proclock;
1798         LOCK       *lock;
1799         int                     lockmethodid = DEFAULT_LOCKMETHOD;
1800         LockMethod      lockMethodTable;
1801
1802         proc = MyProc;
1803         if (proc == NULL)
1804                 return;
1805
1806         procLocks = &proc->procLocks;
1807
1808         Assert(lockmethodid < NumLockMethods);
1809         lockMethodTable = LockMethods[lockmethodid];
1810         if (!lockMethodTable)
1811                 return;
1812
1813         if (proc->waitLock)
1814                 LOCK_PRINT("DumpLocks: waiting on", proc->waitLock, 0);
1815
1816         proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
1817                                                                                  offsetof(PROCLOCK, procLink));
1818
1819         while (proclock)
1820         {
1821                 Assert(proclock->tag.proc == MAKE_OFFSET(proc));
1822
1823                 lock = (LOCK *) MAKE_PTR(proclock->tag.lock);
1824
1825                 PROCLOCK_PRINT("DumpLocks", proclock);
1826                 LOCK_PRINT("DumpLocks", lock, 0);
1827
1828                 proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->procLink,
1829                                                                                    offsetof(PROCLOCK, procLink));
1830         }
1831 }
1832
1833 /*
1834  * Dump all postgres locks. Must have already acquired the masterLock.
1835  */
1836 void
1837 DumpAllLocks(void)
1838 {
1839         PGPROC     *proc;
1840         PROCLOCK   *proclock;
1841         LOCK       *lock;
1842         int                     lockmethodid = DEFAULT_LOCKMETHOD;
1843         LockMethod      lockMethodTable;
1844         HTAB       *proclockTable;
1845         HASH_SEQ_STATUS status;
1846
1847         proc = MyProc;
1848         if (proc == NULL)
1849                 return;
1850
1851         Assert(lockmethodid < NumLockMethods);
1852         lockMethodTable = LockMethods[lockmethodid];
1853         if (!lockMethodTable)
1854                 return;
1855
1856         proclockTable = LockMethodProcLockHash[lockmethodid];
1857
1858         if (proc->waitLock)
1859                 LOCK_PRINT("DumpAllLocks: waiting on", proc->waitLock, 0);
1860
1861         hash_seq_init(&status, proclockTable);
1862         while ((proclock = (PROCLOCK *) hash_seq_search(&status)) != NULL)
1863         {
1864                 PROCLOCK_PRINT("DumpAllLocks", proclock);
1865
1866                 if (proclock->tag.lock)
1867                 {
1868                         lock = (LOCK *) MAKE_PTR(proclock->tag.lock);
1869                         LOCK_PRINT("DumpAllLocks", lock, 0);
1870                 }
1871                 else
1872                         elog(LOG, "DumpAllLocks: proclock->tag.lock = NULL");
1873         }
1874 }
1875
1876 #endif   /* LOCK_DEBUG */