]> granicus.if.org Git - postgresql/blob - src/backend/storage/lmgr/lock.c
Refactor some duplicated code in lock.c: create UnGrantLock(), move code
[postgresql] / src / backend / storage / lmgr / lock.c
1 /*-------------------------------------------------------------------------
2  *
3  * lock.c
4  *        POSTGRES low-level lock mechanism
5  *
6  * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $PostgreSQL: pgsql/src/backend/storage/lmgr/lock.c,v 1.146 2005/02/04 02:04:53 neilc Exp $
12  *
13  * NOTES
14  *        Outside modules can create a lock table and acquire/release
15  *        locks.  A lock table is a shared memory hash table.  When
16  *        a process tries to acquire a lock of a type that conflicts
17  *        with existing locks, it is put to sleep using the routines
18  *        in storage/lmgr/proc.c.
19  *
20  *        For the most part, this code should be invoked via lmgr.c
21  *        or another lock-management module, not directly.
22  *
23  *      Interface:
24  *
25  *      LockAcquire(), LockRelease(), LockMethodTableInit(),
26  *      LockMethodTableRename(), LockReleaseAll(),
27  *      LockCheckConflicts(), GrantLock()
28  *
29  *-------------------------------------------------------------------------
30  */
31 #include "postgres.h"
32
33 #include <signal.h>
34 #include <unistd.h>
35
36 #include "access/xact.h"
37 #include "miscadmin.h"
38 #include "storage/proc.h"
39 #include "utils/memutils.h"
40 #include "utils/ps_status.h"
41 #include "utils/resowner.h"
42
43
44 /* This configuration variable is used to set the lock table size */
45 int                     max_locks_per_xact; /* set by guc.c */
46
47 #define NLOCKENTS(maxBackends)  (max_locks_per_xact * (maxBackends))
48
49
50 /*
51  * map from lock method id to the lock table data structures
52  */
53 static LockMethod LockMethods[MAX_LOCK_METHODS];
54 static HTAB *LockMethodLockHash[MAX_LOCK_METHODS];
55 static HTAB *LockMethodProcLockHash[MAX_LOCK_METHODS];
56 static HTAB *LockMethodLocalHash[MAX_LOCK_METHODS];
57
58 /* exported so lmgr.c can initialize it */
59 int                     NumLockMethods;
60
61
62 /* private state for GrantAwaitedLock */
63 static LOCALLOCK *awaitedLock;
64 static ResourceOwner awaitedOwner;
65
66
67 static const char *const lock_mode_names[] =
68 {
69         "INVALID",
70         "AccessShareLock",
71         "RowShareLock",
72         "RowExclusiveLock",
73         "ShareUpdateExclusiveLock",
74         "ShareLock",
75         "ShareRowExclusiveLock",
76         "ExclusiveLock",
77         "AccessExclusiveLock"
78 };
79
80
81 #ifdef LOCK_DEBUG
82
83 /*------
84  * The following configuration options are available for lock debugging:
85  *
86  *         TRACE_LOCKS          -- give a bunch of output what's going on in this file
87  *         TRACE_USERLOCKS      -- same but for user locks
88  *         TRACE_LOCK_OIDMIN-- do not trace locks for tables below this oid
89  *                                                 (use to avoid output on system tables)
90  *         TRACE_LOCK_TABLE -- trace locks on this table (oid) unconditionally
91  *         DEBUG_DEADLOCKS      -- currently dumps locks at untimely occasions ;)
92  *
93  * Furthermore, but in storage/lmgr/lwlock.c:
94  *         TRACE_LWLOCKS        -- trace lightweight locks (pretty useless)
95  *
96  * Define LOCK_DEBUG at compile time to get all these enabled.
97  * --------
98  */
99
100 int                     Trace_lock_oidmin = BootstrapObjectIdData;
101 bool            Trace_locks = false;
102 bool            Trace_userlocks = false;
103 int                     Trace_lock_table = 0;
104 bool            Debug_deadlocks = false;
105
106
107 inline static bool
108 LOCK_DEBUG_ENABLED(const LOCK *lock)
109 {
110         return
111                 (((LOCK_LOCKMETHOD(*lock) == DEFAULT_LOCKMETHOD && Trace_locks)
112            || (LOCK_LOCKMETHOD(*lock) == USER_LOCKMETHOD && Trace_userlocks))
113                  && (lock->tag.relId >= (Oid) Trace_lock_oidmin))
114                 || (Trace_lock_table && (lock->tag.relId == Trace_lock_table));
115 }
116
117
118 inline static void
119 LOCK_PRINT(const char *where, const LOCK *lock, LOCKMODE type)
120 {
121         if (LOCK_DEBUG_ENABLED(lock))
122                 elog(LOG,
123                          "%s: lock(%lx) tbl(%d) rel(%u) db(%u) obj(%u) grantMask(%x) "
124                          "req(%d,%d,%d,%d,%d,%d,%d)=%d "
125                          "grant(%d,%d,%d,%d,%d,%d,%d)=%d wait(%d) type(%s)",
126                          where, MAKE_OFFSET(lock),
127                          lock->tag.lockmethodid, lock->tag.relId, lock->tag.dbId,
128                          lock->tag.objId.blkno, lock->grantMask,
129                          lock->requested[1], lock->requested[2], lock->requested[3],
130                          lock->requested[4], lock->requested[5], lock->requested[6],
131                          lock->requested[7], lock->nRequested,
132                          lock->granted[1], lock->granted[2], lock->granted[3],
133                          lock->granted[4], lock->granted[5], lock->granted[6],
134                          lock->granted[7], lock->nGranted,
135                          lock->waitProcs.size, lock_mode_names[type]);
136 }
137
138
139 inline static void
140 PROCLOCK_PRINT(const char *where, const PROCLOCK *proclockP)
141 {
142         if (
143                 (((PROCLOCK_LOCKMETHOD(*proclockP) == DEFAULT_LOCKMETHOD && Trace_locks)
144                   || (PROCLOCK_LOCKMETHOD(*proclockP) == USER_LOCKMETHOD && Trace_userlocks))
145                  && (((LOCK *) MAKE_PTR(proclockP->tag.lock))->tag.relId >= (Oid) Trace_lock_oidmin))
146                 || (Trace_lock_table && (((LOCK *) MAKE_PTR(proclockP->tag.lock))->tag.relId == Trace_lock_table))
147                 )
148                 elog(LOG,
149                 "%s: proclock(%lx) lock(%lx) tbl(%d) proc(%lx) xid(%u) hold(%x)",
150                          where, MAKE_OFFSET(proclockP), proclockP->tag.lock,
151                          PROCLOCK_LOCKMETHOD(*(proclockP)),
152                          proclockP->tag.proc, proclockP->tag.xid,
153                          (int) proclockP->holdMask);
154 }
155
156 #else                                                   /* not LOCK_DEBUG */
157
158 #define LOCK_PRINT(where, lock, type)
159 #define PROCLOCK_PRINT(where, proclockP)
160 #endif   /* not LOCK_DEBUG */
161
162
163 static void RemoveLocalLock(LOCALLOCK *locallock);
164 static void GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner);
165 static int WaitOnLock(LOCKMETHODID lockmethodid, LOCALLOCK *locallock,
166                    ResourceOwner owner);
167 static void LockCountMyLocks(SHMEM_OFFSET lockOffset, PGPROC *proc,
168                                  int *myHolding);
169 static bool UnGrantLock(LOCK *lock, LOCKMODE lockmode,
170                                                 PROCLOCK *proclock, LockMethod lockMethodTable);
171
172
173 /*
174  * InitLocks -- Init the lock module.  Create a private data
175  *              structure for constructing conflict masks.
176  */
177 void
178 InitLocks(void)
179 {
180         /* NOP */
181 }
182
183
184 /*
185  * Fetch the lock method table associated with a given lock
186  */
187 LockMethod
188 GetLocksMethodTable(LOCK *lock)
189 {
190         LOCKMETHODID lockmethodid = LOCK_LOCKMETHOD(*lock);
191
192         Assert(0 < lockmethodid && lockmethodid < NumLockMethods);
193         return LockMethods[lockmethodid];
194 }
195
196
197 /*
198  * LockMethodInit -- initialize the lock table's lock type
199  *              structures
200  *
201  * Notes: just copying.  Should only be called once.
202  */
203 static void
204 LockMethodInit(LockMethod lockMethodTable,
205                            const LOCKMASK *conflictsP,
206                            int numModes)
207 {
208         int                     i;
209
210         lockMethodTable->numLockModes = numModes;
211         /* copies useless zero element as well as the N lockmodes */
212         for (i = 0; i <= numModes; i++)
213                 lockMethodTable->conflictTab[i] = conflictsP[i];
214 }
215
216 /*
217  * LockMethodTableInit -- initialize a lock table structure
218  *
219  * NOTE: data structures allocated here are allocated permanently, using
220  * TopMemoryContext and shared memory.  We don't ever release them anyway,
221  * and in normal multi-backend operation the lock table structures set up
222  * by the postmaster are inherited by each backend, so they must be in
223  * TopMemoryContext.
224  */
225 LOCKMETHODID
226 LockMethodTableInit(const char *tabName,
227                                         const LOCKMASK *conflictsP,
228                                         int numModes,
229                                         int maxBackends)
230 {
231         LockMethod      newLockMethod;
232         LOCKMETHODID lockmethodid;
233         char       *shmemName;
234         HASHCTL         info;
235         int                     hash_flags;
236         bool            found;
237         long            init_table_size,
238                                 max_table_size;
239
240         if (numModes >= MAX_LOCKMODES)
241                 elog(ERROR, "too many lock types %d (limit is %d)",
242                          numModes, MAX_LOCKMODES - 1);
243
244         /* Compute init/max size to request for lock hashtables */
245         max_table_size = NLOCKENTS(maxBackends);
246         init_table_size = max_table_size / 2;
247
248         /* Allocate a string for the shmem index table lookups. */
249         /* This is just temp space in this routine, so palloc is OK. */
250         shmemName = (char *) palloc(strlen(tabName) + 32);
251
252         /* each lock table has a header in shared memory */
253         sprintf(shmemName, "%s (lock method table)", tabName);
254         newLockMethod = (LockMethod)
255                 ShmemInitStruct(shmemName, sizeof(LockMethodData), &found);
256
257         if (!newLockMethod)
258                 elog(FATAL, "could not initialize lock table \"%s\"", tabName);
259
260         /*
261          * we're first - initialize
262          */
263         if (!found)
264         {
265                 MemSet(newLockMethod, 0, sizeof(LockMethodData));
266                 newLockMethod->masterLock = LockMgrLock;
267                 LockMethodInit(newLockMethod, conflictsP, numModes);
268         }
269
270         /*
271          * other modules refer to the lock table by a lockmethod ID
272          */
273         Assert(NumLockMethods < MAX_LOCK_METHODS);
274         lockmethodid = NumLockMethods++;
275         LockMethods[lockmethodid] = newLockMethod;
276
277         /*
278          * allocate a hash table for LOCK structs.      This is used to store
279          * per-locked-object information.
280          */
281         MemSet(&info, 0, sizeof(info));
282         info.keysize = sizeof(LOCKTAG);
283         info.entrysize = sizeof(LOCK);
284         info.hash = tag_hash;
285         hash_flags = (HASH_ELEM | HASH_FUNCTION);
286
287         sprintf(shmemName, "%s (lock hash)", tabName);
288         LockMethodLockHash[lockmethodid] = ShmemInitHash(shmemName,
289                                                                                                          init_table_size,
290                                                                                                          max_table_size,
291                                                                                                          &info,
292                                                                                                          hash_flags);
293
294         if (!LockMethodLockHash[lockmethodid])
295                 elog(FATAL, "could not initialize lock table \"%s\"", tabName);
296
297         /*
298          * allocate a hash table for PROCLOCK structs.  This is used to store
299          * per-lock-holder information.
300          */
301         info.keysize = sizeof(PROCLOCKTAG);
302         info.entrysize = sizeof(PROCLOCK);
303         info.hash = tag_hash;
304         hash_flags = (HASH_ELEM | HASH_FUNCTION);
305
306         sprintf(shmemName, "%s (proclock hash)", tabName);
307         LockMethodProcLockHash[lockmethodid] = ShmemInitHash(shmemName,
308                                                                                                                  init_table_size,
309                                                                                                                  max_table_size,
310                                                                                                                  &info,
311                                                                                                                  hash_flags);
312
313         if (!LockMethodProcLockHash[lockmethodid])
314                 elog(FATAL, "could not initialize lock table \"%s\"", tabName);
315
316         /*
317          * allocate a non-shared hash table for LOCALLOCK structs.      This is
318          * used to store lock counts and resource owner information.
319          *
320          * The non-shared table could already exist in this process (this occurs
321          * when the postmaster is recreating shared memory after a backend
322          * crash). If so, delete and recreate it.  (We could simply leave it,
323          * since it ought to be empty in the postmaster, but for safety let's
324          * zap it.)
325          */
326         if (LockMethodLocalHash[lockmethodid])
327                 hash_destroy(LockMethodLocalHash[lockmethodid]);
328
329         info.keysize = sizeof(LOCALLOCKTAG);
330         info.entrysize = sizeof(LOCALLOCK);
331         info.hash = tag_hash;
332         hash_flags = (HASH_ELEM | HASH_FUNCTION);
333
334         sprintf(shmemName, "%s (locallock hash)", tabName);
335         LockMethodLocalHash[lockmethodid] = hash_create(shmemName,
336                                                                                                         128,
337                                                                                                         &info,
338                                                                                                         hash_flags);
339
340         pfree(shmemName);
341
342         return lockmethodid;
343 }
344
345 /*
346  * LockMethodTableRename -- allocate another lockmethod ID to the same
347  *              lock table.
348  *
349  * NOTES: Both the lock module and the lock chain (lchain.c)
350  *              module use table id's to distinguish between different
351  *              kinds of locks.  Short term and long term locks look
352  *              the same to the lock table, but are handled differently
353  *              by the lock chain manager.      This function allows the
354  *              client to use different lockmethods when acquiring/releasing
355  *              short term and long term locks, yet store them all in one hashtable.
356  */
357
358 LOCKMETHODID
359 LockMethodTableRename(LOCKMETHODID lockmethodid)
360 {
361         LOCKMETHODID newLockMethodId;
362
363         if (NumLockMethods >= MAX_LOCK_METHODS)
364                 return INVALID_LOCKMETHOD;
365         if (LockMethods[lockmethodid] == INVALID_LOCKMETHOD)
366                 return INVALID_LOCKMETHOD;
367
368         /* other modules refer to the lock table by a lockmethod ID */
369         newLockMethodId = NumLockMethods;
370         NumLockMethods++;
371
372         LockMethods[newLockMethodId] = LockMethods[lockmethodid];
373         LockMethodLockHash[newLockMethodId] = LockMethodLockHash[lockmethodid];
374         LockMethodProcLockHash[newLockMethodId] = LockMethodProcLockHash[lockmethodid];
375         LockMethodLocalHash[newLockMethodId] = LockMethodLocalHash[lockmethodid];
376
377         return newLockMethodId;
378 }
379
380 /*
381  * LockAcquire -- Check for lock conflicts, sleep if conflict found,
382  *              set lock if/when no conflicts.
383  *
384  * Returns: TRUE if lock was acquired, FALSE otherwise.  Note that
385  *              a FALSE return is to be expected if dontWait is TRUE;
386  *              but if dontWait is FALSE, only a parameter error can cause
387  *              a FALSE return.  (XXX probably we should just ereport on parameter
388  *              errors, instead of conflating this with failure to acquire lock?)
389  *
390  * Side Effects: The lock is acquired and recorded in lock tables.
391  *
392  * NOTE: if we wait for the lock, there is no way to abort the wait
393  * short of aborting the transaction.
394  *
395  *
396  * Note on User Locks:
397  *
398  *              User locks are handled totally on the application side as
399  *              long term cooperative locks which extend beyond the normal
400  *              transaction boundaries.  Their purpose is to indicate to an
401  *              application that someone is `working' on an item.  So it is
402  *              possible to put an user lock on a tuple's oid, retrieve the
403  *              tuple, work on it for an hour and then update it and remove
404  *              the lock.  While the lock is active other clients can still
405  *              read and write the tuple but they can be aware that it has
406  *              been locked at the application level by someone.
407  *              User locks use lock tags made of an uint16 and an uint32, for
408  *              example 0 and a tuple oid, or any other arbitrary pair of
409  *              numbers following a convention established by the application.
410  *              In this sense tags don't refer to tuples or database entities.
411  *              User locks and normal locks are completely orthogonal and
412  *              they don't interfere with each other, so it is possible
413  *              to acquire a normal lock on an user-locked tuple or user-lock
414  *              a tuple for which a normal write lock already exists.
415  *              User locks are always non blocking, therefore they are never
416  *              acquired if already held by another process.  They must be
417  *              released explicitly by the application but they are released
418  *              automatically when a backend terminates.
419  *              They are indicated by a lockmethod 2 which is an alias for the
420  *              normal lock table, and are distinguished from normal locks
421  *              by the following differences:
422  *
423  *                                                                              normal lock             user lock
424  *
425  *              lockmethodid                                    1                               2
426  *              tag.dbId                                                database oid    database oid
427  *              tag.relId                                               rel oid or 0    0
428  *              tag.objId                                               block id                lock id2
429  *                                                                              or xact id
430  *              tag.offnum                                              0                               lock id1
431  *              proclock.xid                                    xid or 0                0
432  *              persistence                                             transaction             user or backend
433  *                                                                              or backend
434  *
435  *              The lockmode parameter can have the same values for normal locks
436  *              although probably only WRITE_LOCK can have some practical use.
437  *
438  *                                                                                                              DZ - 22 Nov 1997
439  */
440
441 bool
442 LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
443                         TransactionId xid, LOCKMODE lockmode, bool dontWait)
444 {
445         LOCALLOCKTAG localtag;
446         LOCALLOCK  *locallock;
447         LOCK       *lock;
448         PROCLOCK   *proclock;
449         PROCLOCKTAG proclocktag;
450         bool            found;
451         ResourceOwner owner;
452         LWLockId        masterLock;
453         LockMethod      lockMethodTable;
454         int                     status;
455         int                     myHolding[MAX_LOCKMODES];
456         int                     i;
457
458 #ifdef LOCK_DEBUG
459         if (lockmethodid == USER_LOCKMETHOD && Trace_userlocks)
460                 elog(LOG, "LockAcquire: user lock [%u] %s",
461                          locktag->objId.blkno, lock_mode_names[lockmode]);
462 #endif
463
464         /* ???????? This must be changed when short term locks will be used */
465         locktag->lockmethodid = lockmethodid;
466
467         Assert(lockmethodid < NumLockMethods);
468         lockMethodTable = LockMethods[lockmethodid];
469         if (!lockMethodTable)
470         {
471                 elog(WARNING, "bad lock table id: %d", lockmethodid);
472                 return FALSE;
473         }
474
475         /* Session locks and user locks are not transactional */
476         if (xid != InvalidTransactionId &&
477                 lockmethodid == DEFAULT_LOCKMETHOD)
478                 owner = CurrentResourceOwner;
479         else
480                 owner = NULL;
481
482         /*
483          * Find or create a LOCALLOCK entry for this lock and lockmode
484          */
485         MemSet(&localtag, 0, sizeof(localtag));         /* must clear padding */
486         localtag.lock = *locktag;
487         localtag.xid = xid;
488         localtag.mode = lockmode;
489
490         locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash[lockmethodid],
491                                                                                   (void *) &localtag,
492                                                                                   HASH_ENTER, &found);
493         if (!locallock)
494                 ereport(ERROR,
495                                 (errcode(ERRCODE_OUT_OF_MEMORY),
496                                  errmsg("out of memory")));
497
498         /*
499          * if it's a new locallock object, initialize it
500          */
501         if (!found)
502         {
503                 locallock->lock = NULL;
504                 locallock->proclock = NULL;
505                 locallock->nLocks = 0;
506                 locallock->numLockOwners = 0;
507                 locallock->maxLockOwners = 8;
508                 locallock->lockOwners = NULL;
509                 locallock->lockOwners = (LOCALLOCKOWNER *)
510                         MemoryContextAlloc(TopMemoryContext,
511                                           locallock->maxLockOwners * sizeof(LOCALLOCKOWNER));
512         }
513         else
514         {
515                 /* Make sure there will be room to remember the lock */
516                 if (locallock->numLockOwners >= locallock->maxLockOwners)
517                 {
518                         int                     newsize = locallock->maxLockOwners * 2;
519
520                         locallock->lockOwners = (LOCALLOCKOWNER *)
521                                 repalloc(locallock->lockOwners,
522                                                  newsize * sizeof(LOCALLOCKOWNER));
523                         locallock->maxLockOwners = newsize;
524                 }
525         }
526
527         /*
528          * If we already hold the lock, we can just increase the count
529          * locally.
530          */
531         if (locallock->nLocks > 0)
532         {
533                 GrantLockLocal(locallock, owner);
534                 return TRUE;
535         }
536
537         /*
538          * Otherwise we've got to mess with the shared lock table.
539          */
540         masterLock = lockMethodTable->masterLock;
541
542         LWLockAcquire(masterLock, LW_EXCLUSIVE);
543
544         /*
545          * Find or create a lock with this tag.
546          *
547          * Note: if the locallock object already existed, it might have a pointer
548          * to the lock already ... but we probably should not assume that that
549          * pointer is valid, since a lock object with no locks can go away
550          * anytime.
551          */
552         lock = (LOCK *) hash_search(LockMethodLockHash[lockmethodid],
553                                                                 (void *) locktag,
554                                                                 HASH_ENTER, &found);
555         if (!lock)
556         {
557                 LWLockRelease(masterLock);
558                 ereport(ERROR,
559                                 (errcode(ERRCODE_OUT_OF_MEMORY),
560                                  errmsg("out of shared memory"),
561                 errhint("You may need to increase max_locks_per_transaction.")));
562         }
563         locallock->lock = lock;
564
565         /*
566          * if it's a new lock object, initialize it
567          */
568         if (!found)
569         {
570                 lock->grantMask = 0;
571                 lock->waitMask = 0;
572                 SHMQueueInit(&(lock->procLocks));
573                 ProcQueueInit(&(lock->waitProcs));
574                 lock->nRequested = 0;
575                 lock->nGranted = 0;
576                 MemSet((char *) lock->requested, 0, sizeof(int) * MAX_LOCKMODES);
577                 MemSet((char *) lock->granted, 0, sizeof(int) * MAX_LOCKMODES);
578                 LOCK_PRINT("LockAcquire: new", lock, lockmode);
579         }
580         else
581         {
582                 LOCK_PRINT("LockAcquire: found", lock, lockmode);
583                 Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
584                 Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
585                 Assert(lock->nGranted <= lock->nRequested);
586         }
587
588         /*
589          * Create the hash key for the proclock table.
590          */
591         MemSet(&proclocktag, 0, sizeof(PROCLOCKTAG));           /* must clear padding */
592         proclocktag.lock = MAKE_OFFSET(lock);
593         proclocktag.proc = MAKE_OFFSET(MyProc);
594         TransactionIdStore(xid, &proclocktag.xid);
595
596         /*
597          * Find or create a proclock entry with this tag
598          */
599         proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid],
600                                                                                 (void *) &proclocktag,
601                                                                                 HASH_ENTER, &found);
602         if (!proclock)
603         {
604                 /* Ooops, not enough shmem for the proclock */
605                 if (lock->nRequested == 0)
606                 {
607                         /*
608                          * There are no other requestors of this lock, so garbage-collect
609                          * the lock object.  We *must* do this to avoid a permanent leak
610                          * of shared memory, because there won't be anything to cause
611                          * anyone to release the lock object later.
612                          */
613                         Assert(SHMQueueEmpty(&(lock->procLocks)));
614                         lock = (LOCK *) hash_search(LockMethodLockHash[lockmethodid],
615                                                                                 (void *) &(lock->tag),
616                                                                                 HASH_REMOVE, NULL);
617                 }
618                 LWLockRelease(masterLock);
619                 if (!lock)                              /* hash remove failed? */
620                         elog(WARNING, "lock table corrupted");
621                 ereport(ERROR,
622                                 (errcode(ERRCODE_OUT_OF_MEMORY),
623                                  errmsg("out of shared memory"),
624                 errhint("You may need to increase max_locks_per_transaction.")));
625         }
626         locallock->proclock = proclock;
627
628         /*
629          * If new, initialize the new entry
630          */
631         if (!found)
632         {
633                 proclock->holdMask = 0;
634                 /* Add proclock to appropriate lists */
635                 SHMQueueInsertBefore(&lock->procLocks, &proclock->lockLink);
636                 SHMQueueInsertBefore(&MyProc->procLocks, &proclock->procLink);
637                 PROCLOCK_PRINT("LockAcquire: new", proclock);
638         }
639         else
640         {
641                 PROCLOCK_PRINT("LockAcquire: found", proclock);
642                 Assert((proclock->holdMask & ~lock->grantMask) == 0);
643
644 #ifdef CHECK_DEADLOCK_RISK
645
646                 /*
647                  * Issue warning if we already hold a lower-level lock on this
648                  * object and do not hold a lock of the requested level or higher.
649                  * This indicates a deadlock-prone coding practice (eg, we'd have
650                  * a deadlock if another backend were following the same code path
651                  * at about the same time).
652                  *
653                  * This is not enabled by default, because it may generate log
654                  * entries about user-level coding practices that are in fact safe
655                  * in context. It can be enabled to help find system-level
656                  * problems.
657                  *
658                  * XXX Doing numeric comparison on the lockmodes is a hack; it'd be
659                  * better to use a table.  For now, though, this works.
660                  */
661                 for (i = lockMethodTable->numLockModes; i > 0; i--)
662                 {
663                         if (proclock->holdMask & LOCKBIT_ON(i))
664                         {
665                                 if (i >= (int) lockmode)
666                                         break;          /* safe: we have a lock >= req level */
667                                 elog(LOG, "deadlock risk: raising lock level"
668                                          " from %s to %s on object %u/%u/%u",
669                                          lock_mode_names[i], lock_mode_names[lockmode],
670                                  lock->tag.relId, lock->tag.dbId, lock->tag.objId.blkno);
671                                 break;
672                         }
673                 }
674 #endif   /* CHECK_DEADLOCK_RISK */
675         }
676
677         /*
678          * lock->nRequested and lock->requested[] count the total number of
679          * requests, whether granted or waiting, so increment those
680          * immediately. The other counts don't increment till we get the lock.
681          */
682         lock->nRequested++;
683         lock->requested[lockmode]++;
684         Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
685
686         /*
687          * If this process (under any XID) is a holder of the lock, just grant
688          * myself another one without blocking.
689          */
690         LockCountMyLocks(proclock->tag.lock, MyProc, myHolding);
691         if (myHolding[lockmode] > 0)
692         {
693                 GrantLock(lock, proclock, lockmode);
694                 GrantLockLocal(locallock, owner);
695                 PROCLOCK_PRINT("LockAcquire: my other XID owning", proclock);
696                 LWLockRelease(masterLock);
697                 return TRUE;
698         }
699
700         /*
701          * If lock requested conflicts with locks requested by waiters, must
702          * join wait queue.  Otherwise, check for conflict with already-held
703          * locks.  (That's last because most complex check.)
704          */
705         if (lockMethodTable->conflictTab[lockmode] & lock->waitMask)
706                 status = STATUS_FOUND;
707         else
708                 status = LockCheckConflicts(lockMethodTable, lockmode,
709                                                                         lock, proclock,
710                                                                         MyProc, myHolding);
711
712         if (status == STATUS_OK)
713         {
714                 /* No conflict with held or previously requested locks */
715                 GrantLock(lock, proclock, lockmode);
716                 GrantLockLocal(locallock, owner);
717         }
718         else
719         {
720                 Assert(status == STATUS_FOUND);
721
722                 /*
723                  * We can't acquire the lock immediately.  If caller specified no
724                  * blocking, remove useless table entries and return FALSE without
725                  * waiting.
726                  */
727                 if (dontWait)
728                 {
729                         if (proclock->holdMask == 0)
730                         {
731                                 SHMQueueDelete(&proclock->lockLink);
732                                 SHMQueueDelete(&proclock->procLink);
733                                 proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid],
734                                                                                            (void *) &(proclock->tag),
735                                                                                                         HASH_REMOVE, NULL);
736                                 if (!proclock)
737                                         elog(WARNING, "proclock table corrupted");
738                         }
739                         else
740                                 PROCLOCK_PRINT("LockAcquire: NOWAIT", proclock);
741                         lock->nRequested--;
742                         lock->requested[lockmode]--;
743                         LOCK_PRINT("LockAcquire: conditional lock failed", lock, lockmode);
744                         Assert((lock->nRequested > 0) && (lock->requested[lockmode] >= 0));
745                         Assert(lock->nGranted <= lock->nRequested);
746                         LWLockRelease(masterLock);
747                         if (locallock->nLocks == 0)
748                                 RemoveLocalLock(locallock);
749                         return FALSE;
750                 }
751
752                 /*
753                  * Construct bitmask of locks this process holds on this object.
754                  */
755                 {
756                         LOCKMASK        heldLocks = 0;
757
758                         for (i = 1; i <= lockMethodTable->numLockModes; i++)
759                         {
760                                 if (myHolding[i] > 0)
761                                         heldLocks |= LOCKBIT_ON(i);
762                         }
763                         MyProc->heldLocks = heldLocks;
764                 }
765
766                 /*
767                  * Sleep till someone wakes me up.
768                  */
769                 status = WaitOnLock(lockmethodid, locallock, owner);
770
771                 /*
772                  * NOTE: do not do any material change of state between here and
773                  * return.      All required changes in locktable state must have been
774                  * done when the lock was granted to us --- see notes in
775                  * WaitOnLock.
776                  */
777
778                 /*
779                  * Check the proclock entry status, in case something in the ipc
780                  * communication doesn't work correctly.
781                  */
782                 if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
783                 {
784                         PROCLOCK_PRINT("LockAcquire: INCONSISTENT", proclock);
785                         LOCK_PRINT("LockAcquire: INCONSISTENT", lock, lockmode);
786                         /* Should we retry ? */
787                         LWLockRelease(masterLock);
788                         return FALSE;
789                 }
790                 PROCLOCK_PRINT("LockAcquire: granted", proclock);
791                 LOCK_PRINT("LockAcquire: granted", lock, lockmode);
792         }
793
794         LWLockRelease(masterLock);
795
796         return status == STATUS_OK;
797 }
798
799 /*
800  * Subroutine to free a locallock entry
801  */
802 static void
803 RemoveLocalLock(LOCALLOCK *locallock)
804 {
805         LOCKMETHODID lockmethodid = LOCALLOCK_LOCKMETHOD(*locallock);
806
807         pfree(locallock->lockOwners);
808         locallock->lockOwners = NULL;
809         locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash[lockmethodid],
810                                                                                   (void *) &(locallock->tag),
811                                                                                   HASH_REMOVE, NULL);
812         if (!locallock)
813                 elog(WARNING, "locallock table corrupted");
814 }
815
816 /*
817  * LockCheckConflicts -- test whether requested lock conflicts
818  *              with those already granted
819  *
820  * Returns STATUS_FOUND if conflict, STATUS_OK if no conflict.
821  *
822  * NOTES:
823  *              Here's what makes this complicated: one process's locks don't
824  * conflict with one another, even if they are held under different
825  * transaction IDs (eg, session and xact locks do not conflict).
826  * So, we must subtract off our own locks when determining whether the
827  * requested new lock conflicts with those already held.
828  *
829  * The caller can optionally pass the process's total holding counts, if
830  * known.  If NULL is passed then these values will be computed internally.
831  */
832 int
833 LockCheckConflicts(LockMethod lockMethodTable,
834                                    LOCKMODE lockmode,
835                                    LOCK *lock,
836                                    PROCLOCK *proclock,
837                                    PGPROC *proc,
838                                    int *myHolding)              /* myHolding[] array or NULL */
839 {
840         int                     numLockModes = lockMethodTable->numLockModes;
841         LOCKMASK        bitmask;
842         int                     i;
843         int                     localHolding[MAX_LOCKMODES];
844
845         /*
846          * first check for global conflicts: If no locks conflict with my
847          * request, then I get the lock.
848          *
849          * Checking for conflict: lock->grantMask represents the types of
850          * currently held locks.  conflictTable[lockmode] has a bit set for
851          * each type of lock that conflicts with request.       Bitwise compare
852          * tells if there is a conflict.
853          */
854         if (!(lockMethodTable->conflictTab[lockmode] & lock->grantMask))
855         {
856                 PROCLOCK_PRINT("LockCheckConflicts: no conflict", proclock);
857                 return STATUS_OK;
858         }
859
860         /*
861          * Rats.  Something conflicts. But it could still be my own lock.  We
862          * have to construct a conflict mask that does not reflect our own
863          * locks.  Locks held by the current process under another XID also
864          * count as "our own locks".
865          */
866         if (myHolding == NULL)
867         {
868                 /* Caller didn't do calculation of total holding for me */
869                 LockCountMyLocks(proclock->tag.lock, proc, localHolding);
870                 myHolding = localHolding;
871         }
872
873         /* Compute mask of lock types held by other processes */
874         bitmask = 0;
875         for (i = 1; i <= numLockModes; i++)
876         {
877                 if (lock->granted[i] != myHolding[i])
878                         bitmask |= LOCKBIT_ON(i);
879         }
880
881         /*
882          * now check again for conflicts.  'bitmask' describes the types of
883          * locks held by other processes.  If one of these conflicts with the
884          * kind of lock that I want, there is a conflict and I have to sleep.
885          */
886         if (!(lockMethodTable->conflictTab[lockmode] & bitmask))
887         {
888                 /* no conflict. OK to get the lock */
889                 PROCLOCK_PRINT("LockCheckConflicts: resolved", proclock);
890                 return STATUS_OK;
891         }
892
893         PROCLOCK_PRINT("LockCheckConflicts: conflicting", proclock);
894         return STATUS_FOUND;
895 }
896
897 /*
898  * LockCountMyLocks --- Count total number of locks held on a given lockable
899  *              object by a given process (under any transaction ID).
900  *
901  * XXX This could be rather slow if the process holds a large number of locks.
902  * Perhaps it could be sped up if we kept yet a third hashtable of per-
903  * process lock information.  However, for the normal case where a transaction
904  * doesn't hold a large number of locks, keeping such a table would probably
905  * be a net slowdown.
906  */
907 static void
908 LockCountMyLocks(SHMEM_OFFSET lockOffset, PGPROC *proc, int *myHolding)
909 {
910         SHM_QUEUE  *procLocks = &(proc->procLocks);
911         PROCLOCK   *proclock;
912
913         MemSet(myHolding, 0, MAX_LOCKMODES * sizeof(int));
914
915         proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
916                                                                                  offsetof(PROCLOCK, procLink));
917
918         while (proclock)
919         {
920                 if (lockOffset == proclock->tag.lock)
921                 {
922                         LOCKMASK        holdMask = proclock->holdMask;
923                         int                     i;
924
925                         for (i = 1; i < MAX_LOCKMODES; i++)
926                         {
927                                 if (holdMask & LOCKBIT_ON(i))
928                                         myHolding[i]++;
929                         }
930                 }
931
932                 proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->procLink,
933                                                                                    offsetof(PROCLOCK, procLink));
934         }
935 }
936
937 /*
938  * GrantLock -- update the lock and proclock data structures to show
939  *              the lock request has been granted.
940  *
941  * NOTE: if proc was blocked, it also needs to be removed from the wait list
942  * and have its waitLock/waitProcLock fields cleared.  That's not done here.
943  *
944  * NOTE: the lock grant also has to be recorded in the associated LOCALLOCK
945  * table entry; but since we may be awaking some other process, we can't do
946  * that here; it's done by GrantLockLocal, instead.
947  */
948 void
949 GrantLock(LOCK *lock, PROCLOCK *proclock, LOCKMODE lockmode)
950 {
951         lock->nGranted++;
952         lock->granted[lockmode]++;
953         lock->grantMask |= LOCKBIT_ON(lockmode);
954         if (lock->granted[lockmode] == lock->requested[lockmode])
955                 lock->waitMask &= LOCKBIT_OFF(lockmode);
956         proclock->holdMask |= LOCKBIT_ON(lockmode);
957         LOCK_PRINT("GrantLock", lock, lockmode);
958         Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
959         Assert(lock->nGranted <= lock->nRequested);
960 }
961
962 /*
963  * UnGrantLock -- opposite of GrantLock. 
964  *
965  * Updates the lock and proclock data structures to show that the lock
966  * is no longer held nor requested by the current holder.
967  *
968  * Returns true if there were any waiters waiting on the lock that
969  * should now be woken up with ProcLockWakeup.
970  */
971 static bool
972 UnGrantLock(LOCK *lock, LOCKMODE lockmode,
973                         PROCLOCK *proclock, LockMethod lockMethodTable)
974 {
975         bool wakeupNeeded = false;
976
977         Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
978         Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
979         Assert(lock->nGranted <= lock->nRequested);
980
981         /*
982          * fix the general lock stats
983          */
984         lock->nRequested--;
985         lock->requested[lockmode]--;
986         lock->nGranted--;
987         lock->granted[lockmode]--;
988
989         if (lock->granted[lockmode] == 0)
990         {
991                 /* change the conflict mask.  No more of this lock type. */
992                 lock->grantMask &= LOCKBIT_OFF(lockmode);
993         }
994
995         LOCK_PRINT("UnGrantLock: updated", lock, lockmode);
996         Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
997         Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
998         Assert(lock->nGranted <= lock->nRequested);
999
1000         /*
1001          * We need only run ProcLockWakeup if the released lock conflicts with
1002          * at least one of the lock types requested by waiter(s).  Otherwise
1003          * whatever conflict made them wait must still exist.  NOTE: before
1004          * MVCC, we could skip wakeup if lock->granted[lockmode] was still
1005          * positive. But that's not true anymore, because the remaining
1006          * granted locks might belong to some waiter, who could now be
1007          * awakened because he doesn't conflict with his own locks.
1008          */
1009         if (lockMethodTable->conflictTab[lockmode] & lock->waitMask)
1010                 wakeupNeeded = true;
1011
1012         /*
1013          * Now fix the per-proclock state.
1014          */
1015         proclock->holdMask &= LOCKBIT_OFF(lockmode);
1016         PROCLOCK_PRINT("UnGrantLock: updated", proclock);
1017
1018         return wakeupNeeded;
1019 }
1020
1021 /*
1022  * GrantLockLocal -- update the locallock data structures to show
1023  *              the lock request has been granted.
1024  *
1025  * We expect that LockAcquire made sure there is room to add a new
1026  * ResourceOwner entry.
1027  */
1028 static void
1029 GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner)
1030 {
1031         LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
1032         int                     i;
1033
1034         Assert(locallock->numLockOwners < locallock->maxLockOwners);
1035         /* Count the total */
1036         locallock->nLocks++;
1037         /* Count the per-owner lock */
1038         for (i = 0; i < locallock->numLockOwners; i++)
1039         {
1040                 if (lockOwners[i].owner == owner)
1041                 {
1042                         lockOwners[i].nLocks++;
1043                         return;
1044                 }
1045         }
1046         lockOwners[i].owner = owner;
1047         lockOwners[i].nLocks = 1;
1048         locallock->numLockOwners++;
1049 }
1050
1051 /*
1052  * GrantAwaitedLock -- call GrantLockLocal for the lock we are doing
1053  *              WaitOnLock on.
1054  *
1055  * proc.c needs this for the case where we are booted off the lock by
1056  * timeout, but discover that someone granted us the lock anyway.
1057  *
1058  * We could just export GrantLockLocal, but that would require including
1059  * resowner.h in lock.h, which creates circularity.
1060  */
1061 void
1062 GrantAwaitedLock(void)
1063 {
1064         GrantLockLocal(awaitedLock, awaitedOwner);
1065 }
1066
1067 /*
1068  * WaitOnLock -- wait to acquire a lock
1069  *
1070  * Caller must have set MyProc->heldLocks to reflect locks already held
1071  * on the lockable object by this process (under all XIDs).
1072  *
1073  * The locktable's masterLock must be held at entry.
1074  */
1075 static int
1076 WaitOnLock(LOCKMETHODID lockmethodid, LOCALLOCK *locallock,
1077                    ResourceOwner owner)
1078 {
1079         LockMethod      lockMethodTable = LockMethods[lockmethodid];
1080         char       *new_status,
1081                            *old_status;
1082
1083         Assert(lockmethodid < NumLockMethods);
1084
1085         LOCK_PRINT("WaitOnLock: sleeping on lock",
1086                            locallock->lock, locallock->tag.mode);
1087
1088         old_status = pstrdup(get_ps_display());
1089         new_status = (char *) palloc(strlen(old_status) + 10);
1090         strcpy(new_status, old_status);
1091         strcat(new_status, " waiting");
1092         set_ps_display(new_status);
1093
1094         awaitedLock = locallock;
1095         awaitedOwner = owner;
1096
1097         /*
1098          * NOTE: Think not to put any shared-state cleanup after the call to
1099          * ProcSleep, in either the normal or failure path.  The lock state
1100          * must be fully set by the lock grantor, or by CheckDeadLock if we
1101          * give up waiting for the lock.  This is necessary because of the
1102          * possibility that a cancel/die interrupt will interrupt ProcSleep
1103          * after someone else grants us the lock, but before we've noticed it.
1104          * Hence, after granting, the locktable state must fully reflect the
1105          * fact that we own the lock; we can't do additional work on return.
1106          * Contrariwise, if we fail, any cleanup must happen in xact abort
1107          * processing, not here, to ensure it will also happen in the
1108          * cancel/die case.
1109          */
1110
1111         if (ProcSleep(lockMethodTable,
1112                                   locallock->tag.mode,
1113                                   locallock->lock,
1114                                   locallock->proclock) != STATUS_OK)
1115         {
1116                 /*
1117                  * We failed as a result of a deadlock, see CheckDeadLock(). Quit
1118                  * now.  Removal of the proclock and lock objects, if no longer
1119                  * needed, will happen in xact cleanup (see above for motivation).
1120                  */
1121                 awaitedLock = NULL;
1122                 LOCK_PRINT("WaitOnLock: aborting on lock",
1123                                    locallock->lock, locallock->tag.mode);
1124                 LWLockRelease(lockMethodTable->masterLock);
1125
1126                 /*
1127                  * Now that we aren't holding the LockMgrLock, we can give an
1128                  * error report including details about the detected deadlock.
1129                  */
1130                 DeadLockReport();
1131                 /* not reached */
1132         }
1133
1134         awaitedLock = NULL;
1135
1136         set_ps_display(old_status);
1137         pfree(old_status);
1138         pfree(new_status);
1139
1140         LOCK_PRINT("WaitOnLock: wakeup on lock",
1141                            locallock->lock, locallock->tag.mode);
1142         return STATUS_OK;
1143 }
1144
1145 /*
1146  * Remove a proc from the wait-queue it is on
1147  * (caller must know it is on one).
1148  *
1149  * Locktable lock must be held by caller.
1150  *
1151  * NB: this does not remove the process' proclock object, nor the lock object,
1152  * even though their counts might now have gone to zero.  That will happen
1153  * during a subsequent LockReleaseAll call, which we expect will happen
1154  * during transaction cleanup.  (Removal of a proc from its wait queue by
1155  * this routine can only happen if we are aborting the transaction.)
1156  */
1157 void
1158 RemoveFromWaitQueue(PGPROC *proc)
1159 {
1160         LOCK       *waitLock = proc->waitLock;
1161         LOCKMODE        lockmode = proc->waitLockMode;
1162
1163         /* Make sure proc is waiting */
1164         Assert(proc->links.next != INVALID_OFFSET);
1165         Assert(waitLock);
1166         Assert(waitLock->waitProcs.size > 0);
1167
1168         /* Remove proc from lock's wait queue */
1169         SHMQueueDelete(&(proc->links));
1170         waitLock->waitProcs.size--;
1171
1172         /* Undo increments of request counts by waiting process */
1173         Assert(waitLock->nRequested > 0);
1174         Assert(waitLock->nRequested > proc->waitLock->nGranted);
1175         waitLock->nRequested--;
1176         Assert(waitLock->requested[lockmode] > 0);
1177         waitLock->requested[lockmode]--;
1178         /* don't forget to clear waitMask bit if appropriate */
1179         if (waitLock->granted[lockmode] == waitLock->requested[lockmode])
1180                 waitLock->waitMask &= LOCKBIT_OFF(lockmode);
1181
1182         /* Clean up the proc's own state */
1183         proc->waitLock = NULL;
1184         proc->waitProcLock = NULL;
1185
1186         /* See if any other waiters for the lock can be woken up now */
1187         ProcLockWakeup(GetLocksMethodTable(waitLock), waitLock);
1188 }
1189
1190 /*
1191  * LockRelease -- look up 'locktag' in lock table 'lockmethodid' and
1192  *              release one 'lockmode' lock on it.
1193  *
1194  * Side Effects: find any waiting processes that are now wakable,
1195  *              grant them their requested locks and awaken them.
1196  *              (We have to grant the lock here to avoid a race between
1197  *              the waking process and any new process to
1198  *              come along and request the lock.)
1199  */
1200 bool
1201 LockRelease(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
1202                         TransactionId xid, LOCKMODE lockmode)
1203 {
1204         LOCALLOCKTAG localtag;
1205         LOCALLOCK  *locallock;
1206         LOCK       *lock;
1207         PROCLOCK   *proclock;
1208         LWLockId        masterLock;
1209         LockMethod      lockMethodTable;
1210         bool            wakeupNeeded = false;
1211
1212 #ifdef LOCK_DEBUG
1213         if (lockmethodid == USER_LOCKMETHOD && Trace_userlocks)
1214                 elog(LOG, "LockRelease: user lock tag [%u] %d", locktag->objId.blkno, lockmode);
1215 #endif
1216
1217         /* ???????? This must be changed when short term locks will be used */
1218         locktag->lockmethodid = lockmethodid;
1219
1220         Assert(lockmethodid < NumLockMethods);
1221         lockMethodTable = LockMethods[lockmethodid];
1222         if (!lockMethodTable)
1223         {
1224                 elog(WARNING, "lockMethodTable is null in LockRelease");
1225                 return FALSE;
1226         }
1227
1228         /*
1229          * Find the LOCALLOCK entry for this lock and lockmode
1230          */
1231         MemSet(&localtag, 0, sizeof(localtag));         /* must clear padding */
1232         localtag.lock = *locktag;
1233         localtag.xid = xid;
1234         localtag.mode = lockmode;
1235
1236         locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash[lockmethodid],
1237                                                                                   (void *) &localtag,
1238                                                                                   HASH_FIND, NULL);
1239
1240         /*
1241          * let the caller print its own error message, too. Do not
1242          * ereport(ERROR).
1243          */
1244         if (!locallock || locallock->nLocks <= 0)
1245         {
1246                 elog(WARNING, "you don't own a lock of type %s",
1247                          lock_mode_names[lockmode]);
1248                 return FALSE;
1249         }
1250
1251         /*
1252          * Decrease the count for the resource owner.
1253          */
1254         {
1255                 LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
1256                 ResourceOwner owner;
1257                 int                     i;
1258
1259                 /* Session locks and user locks are not transactional */
1260                 if (xid != InvalidTransactionId &&
1261                         lockmethodid == DEFAULT_LOCKMETHOD)
1262                         owner = CurrentResourceOwner;
1263                 else
1264                         owner = NULL;
1265
1266                 for (i = locallock->numLockOwners - 1; i >= 0; i--)
1267                 {
1268                         if (lockOwners[i].owner == owner)
1269                         {
1270                                 Assert(lockOwners[i].nLocks > 0);
1271                                 if (--lockOwners[i].nLocks == 0)
1272                                 {
1273                                         /* compact out unused slot */
1274                                         locallock->numLockOwners--;
1275                                         if (i < locallock->numLockOwners)
1276                                                 lockOwners[i] = lockOwners[locallock->numLockOwners];
1277                                 }
1278                                 break;
1279                         }
1280                 }
1281                 if (i < 0)
1282                 {
1283                         /* don't release a lock belonging to another owner */
1284                         elog(WARNING, "you don't own a lock of type %s",
1285                                  lock_mode_names[lockmode]);
1286                         return FALSE;
1287                 }
1288         }
1289
1290         /*
1291          * Decrease the total local count.      If we're still holding the lock,
1292          * we're done.
1293          */
1294         locallock->nLocks--;
1295
1296         if (locallock->nLocks > 0)
1297                 return TRUE;
1298
1299         /*
1300          * Otherwise we've got to mess with the shared lock table.
1301          */
1302         masterLock = lockMethodTable->masterLock;
1303
1304         LWLockAcquire(masterLock, LW_EXCLUSIVE);
1305
1306         /*
1307          * We don't need to re-find the lock or proclock, since we kept their
1308          * addresses in the locallock table, and they couldn't have been
1309          * removed while we were holding a lock on them.
1310          */
1311         lock = locallock->lock;
1312         LOCK_PRINT("LockRelease: found", lock, lockmode);
1313         proclock = locallock->proclock;
1314         PROCLOCK_PRINT("LockRelease: found", proclock);
1315
1316         /*
1317          * Double-check that we are actually holding a lock of the type we
1318          * want to release.
1319          */
1320         if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
1321         {
1322                 PROCLOCK_PRINT("LockRelease: WRONGTYPE", proclock);
1323                 LWLockRelease(masterLock);
1324                 elog(WARNING, "you don't own a lock of type %s",
1325                          lock_mode_names[lockmode]);
1326                 RemoveLocalLock(locallock);
1327                 return FALSE;
1328         }
1329
1330         wakeupNeeded = UnGrantLock(lock, lockmode, proclock, lockMethodTable);
1331
1332         /*
1333          * If this was my last hold on this lock, delete my entry in the
1334          * proclock table.
1335          */
1336         if (proclock->holdMask == 0)
1337         {
1338                 PROCLOCK_PRINT("LockRelease: deleting", proclock);
1339                 SHMQueueDelete(&proclock->lockLink);
1340                 SHMQueueDelete(&proclock->procLink);
1341                 proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid],
1342                                                                                         (void *) &(proclock->tag),
1343                                                                                         HASH_REMOVE, NULL);
1344                 if (!proclock)
1345                 {
1346                         LWLockRelease(masterLock);
1347                         elog(WARNING, "proclock table corrupted");
1348                         RemoveLocalLock(locallock);
1349                         return FALSE;
1350                 }
1351         }
1352
1353         if (lock->nRequested == 0)
1354         {
1355                 /*
1356                  * We've just released the last lock, so garbage-collect the lock
1357                  * object.
1358                  */
1359                 Assert(SHMQueueEmpty(&(lock->procLocks)));
1360                 lock = (LOCK *) hash_search(LockMethodLockHash[lockmethodid],
1361                                                                         (void *) &(lock->tag),
1362                                                                         HASH_REMOVE, NULL);
1363                 if (!lock)
1364                 {
1365                         LWLockRelease(masterLock);
1366                         elog(WARNING, "lock table corrupted");
1367                         RemoveLocalLock(locallock);
1368                         return FALSE;
1369                 }
1370         }
1371         else
1372         {
1373                 /*
1374                  * Wake up waiters if needed.
1375                  */
1376                 if (wakeupNeeded)
1377                         ProcLockWakeup(lockMethodTable, lock);
1378         }
1379
1380         LWLockRelease(masterLock);
1381
1382         RemoveLocalLock(locallock);
1383         return TRUE;
1384 }
1385
1386 /*
1387  * LockReleaseAll -- Release all locks of the specified lock method that
1388  *              are held by the current process.
1389  *
1390  * Well, not necessarily *all* locks.  The available behaviors are:
1391  *
1392  * allxids == true: release all locks regardless of transaction
1393  * affiliation.
1394  *
1395  * allxids == false: release all locks with Xid != 0
1396  * (zero is the Xid used for "session" locks).
1397  */
1398 bool
1399 LockReleaseAll(LOCKMETHODID lockmethodid, bool allxids)
1400 {
1401         HASH_SEQ_STATUS status;
1402         SHM_QUEUE  *procLocks = &(MyProc->procLocks);
1403         LWLockId        masterLock;
1404         LockMethod      lockMethodTable;
1405         int                     i,
1406                                 numLockModes;
1407         LOCALLOCK  *locallock;
1408         PROCLOCK   *proclock;
1409         LOCK       *lock;
1410
1411 #ifdef LOCK_DEBUG
1412         if (lockmethodid == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
1413                 elog(LOG, "LockReleaseAll: lockmethod=%d", lockmethodid);
1414 #endif
1415
1416         Assert(lockmethodid < NumLockMethods);
1417         lockMethodTable = LockMethods[lockmethodid];
1418         if (!lockMethodTable)
1419         {
1420                 elog(WARNING, "bad lock method: %d", lockmethodid);
1421                 return FALSE;
1422         }
1423
1424         numLockModes = lockMethodTable->numLockModes;
1425         masterLock = lockMethodTable->masterLock;
1426
1427         /*
1428          * First we run through the locallock table and get rid of unwanted
1429          * entries, then we scan the process's proclocks and get rid of those.
1430          * We do this separately because we may have multiple locallock
1431          * entries pointing to the same proclock, and we daren't end up with
1432          * any dangling pointers.
1433          */
1434         hash_seq_init(&status, LockMethodLocalHash[lockmethodid]);
1435
1436         while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
1437         {
1438                 if (locallock->proclock == NULL || locallock->lock == NULL)
1439                 {
1440                         /*
1441                          * We must've run out of shared memory while trying to set up
1442                          * this lock.  Just forget the local entry.
1443                          */
1444                         Assert(locallock->nLocks == 0);
1445                         RemoveLocalLock(locallock);
1446                         continue;
1447                 }
1448
1449                 /* Ignore items that are not of the lockmethod to be removed */
1450                 if (LOCALLOCK_LOCKMETHOD(*locallock) != lockmethodid)
1451                         continue;
1452
1453                 /*
1454                  * Ignore locks with Xid=0 unless we are asked to release all
1455                  * locks
1456                  */
1457                 if (TransactionIdEquals(locallock->tag.xid, InvalidTransactionId)
1458                         && !allxids)
1459                         continue;
1460
1461                 RemoveLocalLock(locallock);
1462         }
1463
1464         LWLockAcquire(masterLock, LW_EXCLUSIVE);
1465
1466         proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
1467                                                                                  offsetof(PROCLOCK, procLink));
1468
1469         while (proclock)
1470         {
1471                 bool            wakeupNeeded = false;
1472                 PROCLOCK   *nextHolder;
1473
1474                 /* Get link first, since we may unlink/delete this proclock */
1475                 nextHolder = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->procLink,
1476                                                                                    offsetof(PROCLOCK, procLink));
1477
1478                 Assert(proclock->tag.proc == MAKE_OFFSET(MyProc));
1479
1480                 lock = (LOCK *) MAKE_PTR(proclock->tag.lock);
1481
1482                 /* Ignore items that are not of the lockmethod to be removed */
1483                 if (LOCK_LOCKMETHOD(*lock) != lockmethodid)
1484                         goto next_item;
1485
1486                 /*
1487                  * Ignore locks with Xid=0 unless we are asked to release all
1488                  * locks
1489                  */
1490                 if (TransactionIdEquals(proclock->tag.xid, InvalidTransactionId)
1491                         && !allxids)
1492                         goto next_item;
1493
1494                 PROCLOCK_PRINT("LockReleaseAll", proclock);
1495                 LOCK_PRINT("LockReleaseAll", lock, 0);
1496                 Assert(lock->nRequested >= 0);
1497                 Assert(lock->nGranted >= 0);
1498                 Assert(lock->nGranted <= lock->nRequested);
1499                 Assert((proclock->holdMask & ~lock->grantMask) == 0);
1500
1501                 /*
1502                  * fix the general lock stats
1503                  */
1504                 if (proclock->holdMask)
1505                 {
1506                         for (i = 1; i <= numLockModes; i++)
1507                         {
1508                                 if (proclock->holdMask & LOCKBIT_ON(i))
1509                                         wakeupNeeded |= UnGrantLock(lock, i, proclock,
1510                                                                                                 lockMethodTable);
1511                         }
1512                 }
1513                 Assert((lock->nRequested >= 0) && (lock->nGranted >= 0));
1514                 Assert(lock->nGranted <= lock->nRequested);
1515                 LOCK_PRINT("LockReleaseAll: updated", lock, 0);
1516
1517                 PROCLOCK_PRINT("LockReleaseAll: deleting", proclock);
1518
1519                 /*
1520                  * Remove the proclock entry from the linked lists
1521                  */
1522                 SHMQueueDelete(&proclock->lockLink);
1523                 SHMQueueDelete(&proclock->procLink);
1524
1525                 /*
1526                  * remove the proclock entry from the hashtable
1527                  */
1528                 proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid],
1529                                                                                         (void *) &(proclock->tag),
1530                                                                                         HASH_REMOVE,
1531                                                                                         NULL);
1532                 if (!proclock)
1533                 {
1534                         LWLockRelease(masterLock);
1535                         elog(WARNING, "proclock table corrupted");
1536                         return FALSE;
1537                 }
1538
1539                 if (lock->nRequested == 0)
1540                 {
1541                         /*
1542                          * We've just released the last lock, so garbage-collect the
1543                          * lock object.
1544                          */
1545                         LOCK_PRINT("LockReleaseAll: deleting", lock, 0);
1546                         Assert(SHMQueueEmpty(&(lock->procLocks)));
1547                         lock = (LOCK *) hash_search(LockMethodLockHash[lockmethodid],
1548                                                                                 (void *) &(lock->tag),
1549                                                                                 HASH_REMOVE, NULL);
1550                         if (!lock)
1551                         {
1552                                 LWLockRelease(masterLock);
1553                                 elog(WARNING, "lock table corrupted");
1554                                 return FALSE;
1555                         }
1556                 }
1557                 else if (wakeupNeeded)
1558                         ProcLockWakeup(lockMethodTable, lock);
1559
1560 next_item:
1561                 proclock = nextHolder;
1562         }
1563
1564         LWLockRelease(masterLock);
1565
1566 #ifdef LOCK_DEBUG
1567         if (lockmethodid == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
1568                 elog(LOG, "LockReleaseAll done");
1569 #endif
1570
1571         return TRUE;
1572 }
1573
1574 /*
1575  * LockReleaseCurrentOwner
1576  *              Release all locks belonging to CurrentResourceOwner
1577  *
1578  * Only DEFAULT_LOCKMETHOD locks can belong to a resource owner.
1579  */
1580 void
1581 LockReleaseCurrentOwner(void)
1582 {
1583         HASH_SEQ_STATUS status;
1584         LOCALLOCK  *locallock;
1585         LOCALLOCKOWNER *lockOwners;
1586         int                     i;
1587
1588         hash_seq_init(&status, LockMethodLocalHash[DEFAULT_LOCKMETHOD]);
1589
1590         while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
1591         {
1592                 /* Ignore items that must be nontransactional */
1593                 if (LOCALLOCK_LOCKMETHOD(*locallock) != DEFAULT_LOCKMETHOD)
1594                         continue;
1595                 if (TransactionIdEquals(locallock->tag.xid, InvalidTransactionId))
1596                         continue;
1597
1598                 /* Scan to see if there are any locks belonging to current owner */
1599                 lockOwners = locallock->lockOwners;
1600                 for (i = locallock->numLockOwners - 1; i >= 0; i--)
1601                 {
1602                         if (lockOwners[i].owner == CurrentResourceOwner)
1603                         {
1604                                 Assert(lockOwners[i].nLocks > 0);
1605                                 if (lockOwners[i].nLocks < locallock->nLocks)
1606                                 {
1607                                         /*
1608                                          * We will still hold this lock after forgetting this
1609                                          * ResourceOwner.
1610                                          */
1611                                         locallock->nLocks -= lockOwners[i].nLocks;
1612                                         /* compact out unused slot */
1613                                         locallock->numLockOwners--;
1614                                         if (i < locallock->numLockOwners)
1615                                                 lockOwners[i] = lockOwners[locallock->numLockOwners];
1616                                 }
1617                                 else
1618                                 {
1619                                         Assert(lockOwners[i].nLocks == locallock->nLocks);
1620                                         /* We want to call LockRelease just once */
1621                                         lockOwners[i].nLocks = 1;
1622                                         locallock->nLocks = 1;
1623                                         if (!LockRelease(DEFAULT_LOCKMETHOD,
1624                                                                          &locallock->tag.lock,
1625                                                                          locallock->tag.xid,
1626                                                                          locallock->tag.mode))
1627                                                 elog(WARNING, "LockReleaseCurrentOwner: failed??");
1628                                 }
1629                                 break;
1630                         }
1631                 }
1632         }
1633 }
1634
1635 /*
1636  * LockReassignCurrentOwner
1637  *              Reassign all locks belonging to CurrentResourceOwner to belong
1638  *              to its parent resource owner
1639  */
1640 void
1641 LockReassignCurrentOwner(void)
1642 {
1643         ResourceOwner parent = ResourceOwnerGetParent(CurrentResourceOwner);
1644         HASH_SEQ_STATUS status;
1645         LOCALLOCK  *locallock;
1646         LOCALLOCKOWNER *lockOwners;
1647
1648         Assert(parent != NULL);
1649
1650         hash_seq_init(&status, LockMethodLocalHash[DEFAULT_LOCKMETHOD]);
1651
1652         while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
1653         {
1654                 int                     i;
1655                 int                     ic = -1;
1656                 int                     ip = -1;
1657
1658                 /* Ignore items that must be nontransactional */
1659                 if (LOCALLOCK_LOCKMETHOD(*locallock) != DEFAULT_LOCKMETHOD)
1660                         continue;
1661                 if (TransactionIdEquals(locallock->tag.xid, InvalidTransactionId))
1662                         continue;
1663
1664                 /*
1665                  * Scan to see if there are any locks belonging to current owner
1666                  * or its parent
1667                  */
1668                 lockOwners = locallock->lockOwners;
1669                 for (i = locallock->numLockOwners - 1; i >= 0; i--)
1670                 {
1671                         if (lockOwners[i].owner == CurrentResourceOwner)
1672                                 ic = i;
1673                         else if (lockOwners[i].owner == parent)
1674                                 ip = i;
1675                 }
1676
1677                 if (ic < 0)
1678                         continue;                       /* no current locks */
1679
1680                 if (ip < 0)
1681                 {
1682                         /* Parent has no slot, so just give it child's slot */
1683                         lockOwners[ic].owner = parent;
1684                 }
1685                 else
1686                 {
1687                         /* Merge child's count with parent's */
1688                         lockOwners[ip].nLocks += lockOwners[ic].nLocks;
1689                         /* compact out unused slot */
1690                         locallock->numLockOwners--;
1691                         if (ic < locallock->numLockOwners)
1692                                 lockOwners[ic] = lockOwners[locallock->numLockOwners];
1693                 }
1694         }
1695 }
1696
1697
1698 /*
1699  * Estimate shared-memory space used for lock tables
1700  */
1701 int
1702 LockShmemSize(int maxBackends)
1703 {
1704         int                     size = 0;
1705         long            max_table_size = NLOCKENTS(maxBackends);
1706
1707         /* lock method headers */
1708         size += MAX_LOCK_METHODS * MAXALIGN(sizeof(LockMethodData));
1709
1710         /* lockHash table */
1711         size += hash_estimate_size(max_table_size, sizeof(LOCK));
1712
1713         /* proclockHash table */
1714         size += hash_estimate_size(max_table_size, sizeof(PROCLOCK));
1715
1716         /*
1717          * Note we count only one pair of hash tables, since the userlocks
1718          * table actually overlays the main one.
1719          *
1720          * Since the lockHash entry count above is only an estimate, add 10%
1721          * safety margin.
1722          */
1723         size += size / 10;
1724
1725         return size;
1726 }
1727
1728 /*
1729  * GetLockStatusData - Return a summary of the lock manager's internal
1730  * status, for use in a user-level reporting function.
1731  *
1732  * The return data consists of an array of PROCLOCK objects, with the
1733  * associated PGPROC and LOCK objects for each.  Note that multiple
1734  * copies of the same PGPROC and/or LOCK objects are likely to appear.
1735  * It is the caller's responsibility to match up duplicates if wanted.
1736  *
1737  * The design goal is to hold the LockMgrLock for as short a time as possible;
1738  * thus, this function simply makes a copy of the necessary data and releases
1739  * the lock, allowing the caller to contemplate and format the data for as
1740  * long as it pleases.
1741  */
1742 LockData *
1743 GetLockStatusData(void)
1744 {
1745         LockData   *data;
1746         HTAB       *proclockTable;
1747         PROCLOCK   *proclock;
1748         HASH_SEQ_STATUS seqstat;
1749         int                     i;
1750
1751         data = (LockData *) palloc(sizeof(LockData));
1752
1753         LWLockAcquire(LockMgrLock, LW_EXCLUSIVE);
1754
1755         proclockTable = LockMethodProcLockHash[DEFAULT_LOCKMETHOD];
1756
1757         data->nelements = i = proclockTable->hctl->nentries;
1758
1759         data->proclockaddrs = (SHMEM_OFFSET *) palloc(sizeof(SHMEM_OFFSET) * i);
1760         data->proclocks = (PROCLOCK *) palloc(sizeof(PROCLOCK) * i);
1761         data->procs = (PGPROC *) palloc(sizeof(PGPROC) * i);
1762         data->locks = (LOCK *) palloc(sizeof(LOCK) * i);
1763
1764         hash_seq_init(&seqstat, proclockTable);
1765
1766         i = 0;
1767         while ((proclock = hash_seq_search(&seqstat)))
1768         {
1769                 PGPROC     *proc = (PGPROC *) MAKE_PTR(proclock->tag.proc);
1770                 LOCK       *lock = (LOCK *) MAKE_PTR(proclock->tag.lock);
1771
1772                 data->proclockaddrs[i] = MAKE_OFFSET(proclock);
1773                 memcpy(&(data->proclocks[i]), proclock, sizeof(PROCLOCK));
1774                 memcpy(&(data->procs[i]), proc, sizeof(PGPROC));
1775                 memcpy(&(data->locks[i]), lock, sizeof(LOCK));
1776
1777                 i++;
1778         }
1779
1780         LWLockRelease(LockMgrLock);
1781
1782         Assert(i == data->nelements);
1783
1784         return data;
1785 }
1786
1787 /* Provide the textual name of any lock mode */
1788 const char *
1789 GetLockmodeName(LOCKMODE mode)
1790 {
1791         Assert(mode <= MAX_LOCKMODES);
1792         return lock_mode_names[mode];
1793 }
1794
1795 #ifdef LOCK_DEBUG
1796 /*
1797  * Dump all locks in the MyProc->procLocks list.
1798  *
1799  * Must have already acquired the masterLock.
1800  */
1801 void
1802 DumpLocks(void)
1803 {
1804         PGPROC     *proc;
1805         SHM_QUEUE  *procLocks;
1806         PROCLOCK   *proclock;
1807         LOCK       *lock;
1808         int                     lockmethodid = DEFAULT_LOCKMETHOD;
1809         LockMethod      lockMethodTable;
1810
1811         proc = MyProc;
1812         if (proc == NULL)
1813                 return;
1814
1815         procLocks = &proc->procLocks;
1816
1817         Assert(lockmethodid < NumLockMethods);
1818         lockMethodTable = LockMethods[lockmethodid];
1819         if (!lockMethodTable)
1820                 return;
1821
1822         if (proc->waitLock)
1823                 LOCK_PRINT("DumpLocks: waiting on", proc->waitLock, 0);
1824
1825         proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
1826                                                                                  offsetof(PROCLOCK, procLink));
1827
1828         while (proclock)
1829         {
1830                 Assert(proclock->tag.proc == MAKE_OFFSET(proc));
1831
1832                 lock = (LOCK *) MAKE_PTR(proclock->tag.lock);
1833
1834                 PROCLOCK_PRINT("DumpLocks", proclock);
1835                 LOCK_PRINT("DumpLocks", lock, 0);
1836
1837                 proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->procLink,
1838                                                                                    offsetof(PROCLOCK, procLink));
1839         }
1840 }
1841
1842 /*
1843  * Dump all postgres locks. Must have already acquired the masterLock.
1844  */
1845 void
1846 DumpAllLocks(void)
1847 {
1848         PGPROC     *proc;
1849         PROCLOCK   *proclock;
1850         LOCK       *lock;
1851         int                     lockmethodid = DEFAULT_LOCKMETHOD;
1852         LockMethod      lockMethodTable;
1853         HTAB       *proclockTable;
1854         HASH_SEQ_STATUS status;
1855
1856         proc = MyProc;
1857         if (proc == NULL)
1858                 return;
1859
1860         Assert(lockmethodid < NumLockMethods);
1861         lockMethodTable = LockMethods[lockmethodid];
1862         if (!lockMethodTable)
1863                 return;
1864
1865         proclockTable = LockMethodProcLockHash[lockmethodid];
1866
1867         if (proc->waitLock)
1868                 LOCK_PRINT("DumpAllLocks: waiting on", proc->waitLock, 0);
1869
1870         hash_seq_init(&status, proclockTable);
1871         while ((proclock = (PROCLOCK *) hash_seq_search(&status)) != NULL)
1872         {
1873                 PROCLOCK_PRINT("DumpAllLocks", proclock);
1874
1875                 if (proclock->tag.lock)
1876                 {
1877                         lock = (LOCK *) MAKE_PTR(proclock->tag.lock);
1878                         LOCK_PRINT("DumpAllLocks", lock, 0);
1879                 }
1880                 else
1881                         elog(LOG, "DumpAllLocks: proclock->tag.lock = NULL");
1882         }
1883 }
1884
1885 #endif   /* LOCK_DEBUG */