]> granicus.if.org Git - postgresql/blob - src/backend/storage/lmgr/lock.c
Mark postgresql.conf entries that require server restart; some minor
[postgresql] / src / backend / storage / lmgr / lock.c
1 /*-------------------------------------------------------------------------
2  *
3  * lock.c
4  *        POSTGRES primary lock mechanism
5  *
6  * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $PostgreSQL: pgsql/src/backend/storage/lmgr/lock.c,v 1.168 2006/07/23 23:08:46 tgl Exp $
12  *
13  * NOTES
14  *        A lock table is a shared memory hash table.  When
15  *        a process tries to acquire a lock of a type that conflicts
16  *        with existing locks, it is put to sleep using the routines
17  *        in storage/lmgr/proc.c.
18  *
19  *        For the most part, this code should be invoked via lmgr.c
20  *        or another lock-management module, not directly.
21  *
22  *      Interface:
23  *
24  *      InitLocks(), GetLocksMethodTable(),
25  *      LockAcquire(), LockRelease(), LockReleaseAll(),
26  *      LockCheckConflicts(), GrantLock()
27  *
28  *-------------------------------------------------------------------------
29  */
30 #include "postgres.h"
31
32 #include <signal.h>
33 #include <unistd.h>
34
35 #include "access/transam.h"
36 #include "access/twophase.h"
37 #include "access/twophase_rmgr.h"
38 #include "miscadmin.h"
39 #include "utils/memutils.h"
40 #include "utils/ps_status.h"
41 #include "utils/resowner.h"
42
43
44 /* This configuration variable is used to set the lock table size */
45 int                     max_locks_per_xact; /* set by guc.c */
46
47 #define NLOCKENTS() \
48         mul_size(max_locks_per_xact, add_size(MaxBackends, max_prepared_xacts))
49
50
51 /*
52  * Data structures defining the semantics of the standard lock methods.
53  *
54  * The conflict table defines the semantics of the various lock modes.
55  */
56 static const LOCKMASK LockConflicts[] = {
57         0,
58
59         /* AccessShareLock */
60         (1 << AccessExclusiveLock),
61
62         /* RowShareLock */
63         (1 << ExclusiveLock) | (1 << AccessExclusiveLock),
64
65         /* RowExclusiveLock */
66         (1 << ShareLock) | (1 << ShareRowExclusiveLock) |
67         (1 << ExclusiveLock) | (1 << AccessExclusiveLock),
68
69         /* ShareUpdateExclusiveLock */
70         (1 << ShareUpdateExclusiveLock) |
71         (1 << ShareLock) | (1 << ShareRowExclusiveLock) |
72         (1 << ExclusiveLock) | (1 << AccessExclusiveLock),
73
74         /* ShareLock */
75         (1 << RowExclusiveLock) | (1 << ShareUpdateExclusiveLock) |
76         (1 << ShareRowExclusiveLock) |
77         (1 << ExclusiveLock) | (1 << AccessExclusiveLock),
78
79         /* ShareRowExclusiveLock */
80         (1 << RowExclusiveLock) | (1 << ShareUpdateExclusiveLock) |
81         (1 << ShareLock) | (1 << ShareRowExclusiveLock) |
82         (1 << ExclusiveLock) | (1 << AccessExclusiveLock),
83
84         /* ExclusiveLock */
85         (1 << RowShareLock) |
86         (1 << RowExclusiveLock) | (1 << ShareUpdateExclusiveLock) |
87         (1 << ShareLock) | (1 << ShareRowExclusiveLock) |
88         (1 << ExclusiveLock) | (1 << AccessExclusiveLock),
89
90         /* AccessExclusiveLock */
91         (1 << AccessShareLock) | (1 << RowShareLock) |
92         (1 << RowExclusiveLock) | (1 << ShareUpdateExclusiveLock) |
93         (1 << ShareLock) | (1 << ShareRowExclusiveLock) |
94         (1 << ExclusiveLock) | (1 << AccessExclusiveLock)
95
96 };
97
98 /* Names of lock modes, for debug printouts */
99 static const char *const lock_mode_names[] =
100 {
101         "INVALID",
102         "AccessShareLock",
103         "RowShareLock",
104         "RowExclusiveLock",
105         "ShareUpdateExclusiveLock",
106         "ShareLock",
107         "ShareRowExclusiveLock",
108         "ExclusiveLock",
109         "AccessExclusiveLock"
110 };
111
112 #ifndef LOCK_DEBUG
113 static bool             Dummy_trace = false;
114 #endif
115
116 static const LockMethodData default_lockmethod = {
117         AccessExclusiveLock,            /* highest valid lock mode number */
118         true,
119         LockConflicts,
120         lock_mode_names,
121 #ifdef LOCK_DEBUG
122         &Trace_locks
123 #else
124         &Dummy_trace
125 #endif
126 };
127
128 #ifdef USER_LOCKS
129
130 static const LockMethodData user_lockmethod = {
131         AccessExclusiveLock,            /* highest valid lock mode number */
132         false,
133         LockConflicts,
134         lock_mode_names,
135 #ifdef LOCK_DEBUG
136         &Trace_userlocks
137 #else
138         &Dummy_trace
139 #endif
140 };
141
142 #endif /* USER_LOCKS */
143
144 /*
145  * map from lock method id to the lock table data structures
146  */
147 static const LockMethod LockMethods[] = {
148         NULL,
149         &default_lockmethod,
150 #ifdef USER_LOCKS
151         &user_lockmethod
152 #endif
153 };
154
155
156 /* Record that's written to 2PC state file when a lock is persisted */
157 typedef struct TwoPhaseLockRecord
158 {
159         LOCKTAG         locktag;
160         LOCKMODE        lockmode;
161 } TwoPhaseLockRecord;
162
163
164 /*
165  * Pointers to hash tables containing lock state
166  *
167  * The LockMethodLockHash and LockMethodProcLockHash hash tables are in
168  * shared memory; LockMethodLocalHash is local to each backend.
169  */
170 static HTAB *LockMethodLockHash;
171 static HTAB *LockMethodProcLockHash;
172 static HTAB *LockMethodLocalHash;
173
174
175 /* private state for GrantAwaitedLock */
176 static LOCALLOCK *awaitedLock;
177 static ResourceOwner awaitedOwner;
178
179
180 #ifdef LOCK_DEBUG
181
182 /*------
183  * The following configuration options are available for lock debugging:
184  *
185  *         TRACE_LOCKS          -- give a bunch of output what's going on in this file
186  *         TRACE_USERLOCKS      -- same but for user locks
187  *         TRACE_LOCK_OIDMIN-- do not trace locks for tables below this oid
188  *                                                 (use to avoid output on system tables)
189  *         TRACE_LOCK_TABLE -- trace locks on this table (oid) unconditionally
190  *         DEBUG_DEADLOCKS      -- currently dumps locks at untimely occasions ;)
191  *
192  * Furthermore, but in storage/lmgr/lwlock.c:
193  *         TRACE_LWLOCKS        -- trace lightweight locks (pretty useless)
194  *
195  * Define LOCK_DEBUG at compile time to get all these enabled.
196  * --------
197  */
198
199 int                     Trace_lock_oidmin = FirstNormalObjectId;
200 bool            Trace_locks = false;
201 bool            Trace_userlocks = false;
202 int                     Trace_lock_table = 0;
203 bool            Debug_deadlocks = false;
204
205
206 inline static bool
207 LOCK_DEBUG_ENABLED(const LOCKTAG *tag)
208 {
209         return
210                 (*(LockMethods[tag->locktag_lockmethodid]->trace_flag) &&
211                  ((Oid) tag->locktag_field2 >= (Oid) Trace_lock_oidmin))
212                 || (Trace_lock_table &&
213                         (tag->locktag_field2 == Trace_lock_table));
214 }
215
216
217 inline static void
218 LOCK_PRINT(const char *where, const LOCK *lock, LOCKMODE type)
219 {
220         if (LOCK_DEBUG_ENABLED(&lock->tag))
221                 elog(LOG,
222                          "%s: lock(%p) id(%u,%u,%u,%u,%u,%u) grantMask(%x) "
223                          "req(%d,%d,%d,%d,%d,%d,%d)=%d "
224                          "grant(%d,%d,%d,%d,%d,%d,%d)=%d wait(%d) type(%s)",
225                          where, lock,
226                          lock->tag.locktag_field1, lock->tag.locktag_field2,
227                          lock->tag.locktag_field3, lock->tag.locktag_field4,
228                          lock->tag.locktag_type, lock->tag.locktag_lockmethodid,
229                          lock->grantMask,
230                          lock->requested[1], lock->requested[2], lock->requested[3],
231                          lock->requested[4], lock->requested[5], lock->requested[6],
232                          lock->requested[7], lock->nRequested,
233                          lock->granted[1], lock->granted[2], lock->granted[3],
234                          lock->granted[4], lock->granted[5], lock->granted[6],
235                          lock->granted[7], lock->nGranted,
236                          lock->waitProcs.size,
237                          LockMethods[LOCK_LOCKMETHOD(*lock)]->lockModeNames[type]);
238 }
239
240
241 inline static void
242 PROCLOCK_PRINT(const char *where, const PROCLOCK *proclockP)
243 {
244         if (LOCK_DEBUG_ENABLED(&proclockP->tag.myLock->tag))
245                 elog(LOG,
246                          "%s: proclock(%p) lock(%p) method(%u) proc(%p) hold(%x)",
247                          where, proclockP, proclockP->tag.myLock,
248                          PROCLOCK_LOCKMETHOD(*(proclockP)),
249                          proclockP->tag.myProc, (int) proclockP->holdMask);
250 }
251 #else                                                   /* not LOCK_DEBUG */
252
253 #define LOCK_PRINT(where, lock, type)
254 #define PROCLOCK_PRINT(where, proclockP)
255 #endif   /* not LOCK_DEBUG */
256
257
258 static uint32 proclock_hash(const void *key, Size keysize);
259 static void RemoveLocalLock(LOCALLOCK *locallock);
260 static void GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner);
261 static void WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner);
262 static bool UnGrantLock(LOCK *lock, LOCKMODE lockmode,
263                         PROCLOCK *proclock, LockMethod lockMethodTable);
264 static void CleanUpLock(LOCK *lock, PROCLOCK *proclock,
265                         LockMethod lockMethodTable, uint32 hashcode,
266                         bool wakeupNeeded);
267
268
269 /*
270  * InitLocks -- Initialize the lock manager's data structures.
271  *
272  * This is called from CreateSharedMemoryAndSemaphores(), which see for
273  * more comments.  In the normal postmaster case, the shared hash tables
274  * are created here, as well as a locallock hash table that will remain
275  * unused and empty in the postmaster itself.  Backends inherit the pointers
276  * to the shared tables via fork(), and also inherit an image of the locallock
277  * hash table, which they proceed to use.  In the EXEC_BACKEND case, each
278  * backend re-executes this code to obtain pointers to the already existing
279  * shared hash tables and to create its locallock hash table.
280  */
281 void
282 InitLocks(void)
283 {
284         HASHCTL         info;
285         int                     hash_flags;
286         long            init_table_size,
287                                 max_table_size;
288
289         /*
290          * Compute init/max size to request for lock hashtables.  Note these
291          * calculations must agree with LockShmemSize!
292          */
293         max_table_size = NLOCKENTS();
294         init_table_size = max_table_size / 2;
295
296         /*
297          * Allocate hash table for LOCK structs.  This stores
298          * per-locked-object information.
299          */
300         MemSet(&info, 0, sizeof(info));
301         info.keysize = sizeof(LOCKTAG);
302         info.entrysize = sizeof(LOCK);
303         info.hash = tag_hash;
304         info.num_partitions = NUM_LOCK_PARTITIONS;
305         hash_flags = (HASH_ELEM | HASH_FUNCTION | HASH_PARTITION);
306
307         LockMethodLockHash = ShmemInitHash("LOCK hash",
308                                                                            init_table_size,
309                                                                            max_table_size,
310                                                                            &info,
311                                                                            hash_flags);
312         if (!LockMethodLockHash)
313                 elog(FATAL, "could not initialize lock hash table");
314
315         /* Assume an average of 2 holders per lock */
316         max_table_size *= 2;
317         init_table_size *= 2;
318
319         /*
320          * Allocate hash table for PROCLOCK structs.  This stores
321          * per-lock-per-holder information.
322          */
323         info.keysize = sizeof(PROCLOCKTAG);
324         info.entrysize = sizeof(PROCLOCK);
325         info.hash = proclock_hash;
326         info.num_partitions = NUM_LOCK_PARTITIONS;
327         hash_flags = (HASH_ELEM | HASH_FUNCTION | HASH_PARTITION);
328
329         LockMethodProcLockHash = ShmemInitHash("PROCLOCK hash",
330                                                                                    init_table_size,
331                                                                                    max_table_size,
332                                                                                    &info,
333                                                                                    hash_flags);
334         if (!LockMethodProcLockHash)
335                 elog(FATAL, "could not initialize proclock hash table");
336
337         /*
338          * Allocate non-shared hash table for LOCALLOCK structs.  This stores
339          * lock counts and resource owner information.
340          *
341          * The non-shared table could already exist in this process (this occurs
342          * when the postmaster is recreating shared memory after a backend crash).
343          * If so, delete and recreate it.  (We could simply leave it, since it
344          * ought to be empty in the postmaster, but for safety let's zap it.)
345          */
346         if (LockMethodLocalHash)
347                 hash_destroy(LockMethodLocalHash);
348
349         info.keysize = sizeof(LOCALLOCKTAG);
350         info.entrysize = sizeof(LOCALLOCK);
351         info.hash = tag_hash;
352         hash_flags = (HASH_ELEM | HASH_FUNCTION);
353
354         LockMethodLocalHash = hash_create("LOCALLOCK hash",
355                                                                           128,
356                                                                           &info,
357                                                                           hash_flags);
358 }
359
360
361 /*
362  * Fetch the lock method table associated with a given lock
363  */
364 LockMethod
365 GetLocksMethodTable(const LOCK *lock)
366 {
367         LOCKMETHODID lockmethodid = LOCK_LOCKMETHOD(*lock);
368
369         Assert(0 < lockmethodid && lockmethodid < lengthof(LockMethods));
370         return LockMethods[lockmethodid];
371 }
372
373
374 /*
375  * Compute the hash code associated with a LOCKTAG.
376  *
377  * To avoid unnecessary recomputations of the hash code, we try to do this
378  * just once per function, and then pass it around as needed.  Aside from
379  * passing the hashcode to hash_search_with_hash_value(), we can extract
380  * the lock partition number from the hashcode.
381  */
382 uint32
383 LockTagHashCode(const LOCKTAG *locktag)
384 {
385         return get_hash_value(LockMethodLockHash, (const void *) locktag);
386 }
387
388 /*
389  * Compute the hash code associated with a PROCLOCKTAG.
390  *
391  * Because we want to use just one set of partition locks for both the
392  * LOCK and PROCLOCK hash tables, we have to make sure that PROCLOCKs
393  * fall into the same partition number as their associated LOCKs.
394  * dynahash.c expects the partition number to be the low-order bits of
395  * the hash code, and therefore a PROCLOCKTAG's hash code must have the
396  * same low-order bits as the associated LOCKTAG's hash code.  We achieve
397  * this with this specialized hash function.
398  */
399 static uint32
400 proclock_hash(const void *key, Size keysize)
401 {
402         const PROCLOCKTAG *proclocktag = (const PROCLOCKTAG *) key;
403         uint32  lockhash;
404         Datum   procptr;
405
406         Assert(keysize == sizeof(PROCLOCKTAG));
407
408         /* Look into the associated LOCK object, and compute its hash code */
409         lockhash = LockTagHashCode(&proclocktag->myLock->tag);
410
411         /*
412          * To make the hash code also depend on the PGPROC, we xor the proc
413          * struct's address into the hash code, left-shifted so that the
414          * partition-number bits don't change.  Since this is only a hash,
415          * we don't care if we lose high-order bits of the address; use
416          * an intermediate variable to suppress cast-pointer-to-int warnings.
417          */
418         procptr = PointerGetDatum(proclocktag->myProc);
419         lockhash ^= ((uint32) procptr) << LOG2_NUM_LOCK_PARTITIONS;
420
421         return lockhash;
422 }
423
424 /*
425  * Compute the hash code associated with a PROCLOCKTAG, given the hashcode
426  * for its underlying LOCK.
427  *
428  * We use this just to avoid redundant calls of LockTagHashCode().
429  */
430 static inline uint32
431 ProcLockHashCode(const PROCLOCKTAG *proclocktag, uint32 hashcode)
432 {
433         uint32  lockhash = hashcode;
434         Datum   procptr;
435
436         /*
437          * This must match proclock_hash()!
438          */
439         procptr = PointerGetDatum(proclocktag->myProc);
440         lockhash ^= ((uint32) procptr) << LOG2_NUM_LOCK_PARTITIONS;
441
442         return lockhash;
443 }
444
445
446 /*
447  * LockAcquire -- Check for lock conflicts, sleep if conflict found,
448  *              set lock if/when no conflicts.
449  *
450  * Inputs:
451  *      locktag: unique identifier for the lockable object
452  *      isTempObject: is the lockable object a temporary object?  (Under 2PC,
453  *              such locks cannot be persisted)
454  *      lockmode: lock mode to acquire
455  *      sessionLock: if true, acquire lock for session not current transaction
456  *      dontWait: if true, don't wait to acquire lock
457  *
458  * Returns one of:
459  *              LOCKACQUIRE_NOT_AVAIL           lock not available, and dontWait=true
460  *              LOCKACQUIRE_OK                          lock successfully acquired
461  *              LOCKACQUIRE_ALREADY_HELD        incremented count for lock already held
462  *
463  * In the normal case where dontWait=false and the caller doesn't need to
464  * distinguish a freshly acquired lock from one already taken earlier in
465  * this same transaction, there is no need to examine the return value.
466  *
467  * Side Effects: The lock is acquired and recorded in lock tables.
468  *
469  * NOTE: if we wait for the lock, there is no way to abort the wait
470  * short of aborting the transaction.
471  */
472 LockAcquireResult
473 LockAcquire(const LOCKTAG *locktag,
474                         bool isTempObject,
475                         LOCKMODE lockmode,
476                         bool sessionLock,
477                         bool dontWait)
478 {
479         LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
480         LockMethod      lockMethodTable;
481         LOCALLOCKTAG localtag;
482         LOCALLOCK  *locallock;
483         LOCK       *lock;
484         PROCLOCK   *proclock;
485         PROCLOCKTAG proclocktag;
486         bool            found;
487         ResourceOwner owner;
488         uint32          hashcode;
489         uint32          proclock_hashcode;
490         int                     partition;
491         LWLockId        partitionLock;
492         int                     status;
493
494         if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
495                 elog(ERROR, "unrecognized lock method: %d", lockmethodid);
496         lockMethodTable = LockMethods[lockmethodid];
497         if (lockmode <= 0 || lockmode > lockMethodTable->numLockModes)
498                 elog(ERROR, "unrecognized lock mode: %d", lockmode);
499
500 #ifdef LOCK_DEBUG
501         if (LOCK_DEBUG_ENABLED(locktag))
502                 elog(LOG, "LockAcquire: lock [%u,%u] %s",
503                          locktag->locktag_field1, locktag->locktag_field2,
504                          lockMethodTable->lockModeNames[lockmode]);
505 #endif
506
507         /* Session locks are never transactional, else check table */
508         if (!sessionLock && lockMethodTable->transactional)
509                 owner = CurrentResourceOwner;
510         else
511                 owner = NULL;
512
513         /*
514          * Find or create a LOCALLOCK entry for this lock and lockmode
515          */
516         MemSet(&localtag, 0, sizeof(localtag));         /* must clear padding */
517         localtag.lock = *locktag;
518         localtag.mode = lockmode;
519
520         locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash,
521                                                                                   (void *) &localtag,
522                                                                                   HASH_ENTER, &found);
523
524         /*
525          * if it's a new locallock object, initialize it
526          */
527         if (!found)
528         {
529                 locallock->lock = NULL;
530                 locallock->proclock = NULL;
531                 locallock->isTempObject = isTempObject;
532                 locallock->hashcode = LockTagHashCode(&(localtag.lock));
533                 locallock->nLocks = 0;
534                 locallock->numLockOwners = 0;
535                 locallock->maxLockOwners = 8;
536                 locallock->lockOwners = NULL;
537                 locallock->lockOwners = (LOCALLOCKOWNER *)
538                         MemoryContextAlloc(TopMemoryContext,
539                                                   locallock->maxLockOwners * sizeof(LOCALLOCKOWNER));
540         }
541         else
542         {
543                 Assert(locallock->isTempObject == isTempObject);
544
545                 /* Make sure there will be room to remember the lock */
546                 if (locallock->numLockOwners >= locallock->maxLockOwners)
547                 {
548                         int                     newsize = locallock->maxLockOwners * 2;
549
550                         locallock->lockOwners = (LOCALLOCKOWNER *)
551                                 repalloc(locallock->lockOwners,
552                                                  newsize * sizeof(LOCALLOCKOWNER));
553                         locallock->maxLockOwners = newsize;
554                 }
555         }
556
557         /*
558          * If we already hold the lock, we can just increase the count locally.
559          */
560         if (locallock->nLocks > 0)
561         {
562                 GrantLockLocal(locallock, owner);
563                 return LOCKACQUIRE_ALREADY_HELD;
564         }
565
566         /*
567          * Otherwise we've got to mess with the shared lock table.
568          */
569         hashcode = locallock->hashcode;
570         partition = LockHashPartition(hashcode);
571         partitionLock = LockHashPartitionLock(hashcode);
572
573         LWLockAcquire(partitionLock, LW_EXCLUSIVE);
574
575         /*
576          * Find or create a lock with this tag.
577          *
578          * Note: if the locallock object already existed, it might have a pointer
579          * to the lock already ... but we probably should not assume that that
580          * pointer is valid, since a lock object with no locks can go away
581          * anytime.
582          */
583         lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
584                                                                                                 (void *) locktag,
585                                                                                                 hashcode,
586                                                                                                 HASH_ENTER_NULL,
587                                                                                                 &found);
588         if (!lock)
589         {
590                 LWLockRelease(partitionLock);
591                 ereport(ERROR,
592                                 (errcode(ERRCODE_OUT_OF_MEMORY),
593                                  errmsg("out of shared memory"),
594                         errhint("You may need to increase max_locks_per_transaction.")));
595         }
596         locallock->lock = lock;
597
598         /*
599          * if it's a new lock object, initialize it
600          */
601         if (!found)
602         {
603                 lock->grantMask = 0;
604                 lock->waitMask = 0;
605                 SHMQueueInit(&(lock->procLocks));
606                 ProcQueueInit(&(lock->waitProcs));
607                 lock->nRequested = 0;
608                 lock->nGranted = 0;
609                 MemSet(lock->requested, 0, sizeof(int) * MAX_LOCKMODES);
610                 MemSet(lock->granted, 0, sizeof(int) * MAX_LOCKMODES);
611                 LOCK_PRINT("LockAcquire: new", lock, lockmode);
612         }
613         else
614         {
615                 LOCK_PRINT("LockAcquire: found", lock, lockmode);
616                 Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
617                 Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
618                 Assert(lock->nGranted <= lock->nRequested);
619         }
620
621         /*
622          * Create the hash key for the proclock table.
623          */
624         proclocktag.myLock = lock;
625         proclocktag.myProc = MyProc;
626
627         proclock_hashcode = ProcLockHashCode(&proclocktag, hashcode);
628
629         /*
630          * Find or create a proclock entry with this tag
631          */
632         proclock = (PROCLOCK *) hash_search_with_hash_value(LockMethodProcLockHash,
633                                                                                                                 (void *) &proclocktag,
634                                                                                                                 proclock_hashcode,
635                                                                                                                 HASH_ENTER_NULL,
636                                                                                                                 &found);
637         if (!proclock)
638         {
639                 /* Ooops, not enough shmem for the proclock */
640                 if (lock->nRequested == 0)
641                 {
642                         /*
643                          * There are no other requestors of this lock, so garbage-collect
644                          * the lock object.  We *must* do this to avoid a permanent leak
645                          * of shared memory, because there won't be anything to cause
646                          * anyone to release the lock object later.
647                          */
648                         Assert(SHMQueueEmpty(&(lock->procLocks)));
649                         if (!hash_search_with_hash_value(LockMethodLockHash,
650                                                                                          (void *) &(lock->tag),
651                                                                                          hashcode,
652                                                                                          HASH_REMOVE,
653                                                                                          NULL))
654                                 elog(PANIC, "lock table corrupted");
655                 }
656                 LWLockRelease(partitionLock);
657                 ereport(ERROR,
658                                 (errcode(ERRCODE_OUT_OF_MEMORY),
659                                  errmsg("out of shared memory"),
660                         errhint("You may need to increase max_locks_per_transaction.")));
661         }
662         locallock->proclock = proclock;
663
664         /*
665          * If new, initialize the new entry
666          */
667         if (!found)
668         {
669                 proclock->holdMask = 0;
670                 proclock->releaseMask = 0;
671                 /* Add proclock to appropriate lists */
672                 SHMQueueInsertBefore(&lock->procLocks, &proclock->lockLink);
673                 SHMQueueInsertBefore(&(MyProc->myProcLocks[partition]),
674                                                          &proclock->procLink);
675                 PROCLOCK_PRINT("LockAcquire: new", proclock);
676         }
677         else
678         {
679                 PROCLOCK_PRINT("LockAcquire: found", proclock);
680                 Assert((proclock->holdMask & ~lock->grantMask) == 0);
681
682 #ifdef CHECK_DEADLOCK_RISK
683
684                 /*
685                  * Issue warning if we already hold a lower-level lock on this object
686                  * and do not hold a lock of the requested level or higher. This
687                  * indicates a deadlock-prone coding practice (eg, we'd have a
688                  * deadlock if another backend were following the same code path at
689                  * about the same time).
690                  *
691                  * This is not enabled by default, because it may generate log entries
692                  * about user-level coding practices that are in fact safe in context.
693                  * It can be enabled to help find system-level problems.
694                  *
695                  * XXX Doing numeric comparison on the lockmodes is a hack; it'd be
696                  * better to use a table.  For now, though, this works.
697                  */
698                 {
699                         int                     i;
700
701                         for (i = lockMethodTable->numLockModes; i > 0; i--)
702                         {
703                                 if (proclock->holdMask & LOCKBIT_ON(i))
704                                 {
705                                         if (i >= (int) lockmode)
706                                                 break;  /* safe: we have a lock >= req level */
707                                         elog(LOG, "deadlock risk: raising lock level"
708                                                  " from %s to %s on object %u/%u/%u",
709                                                  lockMethodTable->lockModeNames[i],
710                                                  lockMethodTable->lockModeNames[lockmode],
711                                                  lock->tag.locktag_field1, lock->tag.locktag_field2,
712                                                  lock->tag.locktag_field3);
713                                         break;
714                                 }
715                         }
716                 }
717 #endif   /* CHECK_DEADLOCK_RISK */
718         }
719
720         /*
721          * lock->nRequested and lock->requested[] count the total number of
722          * requests, whether granted or waiting, so increment those immediately.
723          * The other counts don't increment till we get the lock.
724          */
725         lock->nRequested++;
726         lock->requested[lockmode]++;
727         Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
728
729         /*
730          * We shouldn't already hold the desired lock; else locallock table is
731          * broken.
732          */
733         if (proclock->holdMask & LOCKBIT_ON(lockmode))
734                 elog(ERROR, "lock %s on object %u/%u/%u is already held",
735                          lockMethodTable->lockModeNames[lockmode],
736                          lock->tag.locktag_field1, lock->tag.locktag_field2,
737                          lock->tag.locktag_field3);
738
739         /*
740          * If lock requested conflicts with locks requested by waiters, must join
741          * wait queue.  Otherwise, check for conflict with already-held locks.
742          * (That's last because most complex check.)
743          */
744         if (lockMethodTable->conflictTab[lockmode] & lock->waitMask)
745                 status = STATUS_FOUND;
746         else
747                 status = LockCheckConflicts(lockMethodTable, lockmode,
748                                                                         lock, proclock, MyProc);
749
750         if (status == STATUS_OK)
751         {
752                 /* No conflict with held or previously requested locks */
753                 GrantLock(lock, proclock, lockmode);
754                 GrantLockLocal(locallock, owner);
755         }
756         else
757         {
758                 Assert(status == STATUS_FOUND);
759
760                 /*
761                  * We can't acquire the lock immediately.  If caller specified no
762                  * blocking, remove useless table entries and return NOT_AVAIL without
763                  * waiting.
764                  */
765                 if (dontWait)
766                 {
767                         if (proclock->holdMask == 0)
768                         {
769                                 SHMQueueDelete(&proclock->lockLink);
770                                 SHMQueueDelete(&proclock->procLink);
771                                 if (!hash_search_with_hash_value(LockMethodProcLockHash,
772                                                                                                  (void *) &(proclock->tag),
773                                                                                                  proclock_hashcode,
774                                                                                                  HASH_REMOVE,
775                                                                                                  NULL))
776                                         elog(PANIC, "proclock table corrupted");
777                         }
778                         else
779                                 PROCLOCK_PRINT("LockAcquire: NOWAIT", proclock);
780                         lock->nRequested--;
781                         lock->requested[lockmode]--;
782                         LOCK_PRINT("LockAcquire: conditional lock failed", lock, lockmode);
783                         Assert((lock->nRequested > 0) && (lock->requested[lockmode] >= 0));
784                         Assert(lock->nGranted <= lock->nRequested);
785                         LWLockRelease(partitionLock);
786                         if (locallock->nLocks == 0)
787                                 RemoveLocalLock(locallock);
788                         return LOCKACQUIRE_NOT_AVAIL;
789                 }
790
791                 /*
792                  * Set bitmask of locks this process already holds on this object.
793                  */
794                 MyProc->heldLocks = proclock->holdMask;
795
796                 /*
797                  * Sleep till someone wakes me up.
798                  */
799                 WaitOnLock(locallock, owner);
800
801                 /*
802                  * NOTE: do not do any material change of state between here and
803                  * return.      All required changes in locktable state must have been
804                  * done when the lock was granted to us --- see notes in WaitOnLock.
805                  */
806
807                 /*
808                  * Check the proclock entry status, in case something in the ipc
809                  * communication doesn't work correctly.
810                  */
811                 if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
812                 {
813                         PROCLOCK_PRINT("LockAcquire: INCONSISTENT", proclock);
814                         LOCK_PRINT("LockAcquire: INCONSISTENT", lock, lockmode);
815                         /* Should we retry ? */
816                         LWLockRelease(partitionLock);
817                         elog(ERROR, "LockAcquire failed");
818                 }
819                 PROCLOCK_PRINT("LockAcquire: granted", proclock);
820                 LOCK_PRINT("LockAcquire: granted", lock, lockmode);
821         }
822
823         LWLockRelease(partitionLock);
824
825         return LOCKACQUIRE_OK;
826 }
827
828 /*
829  * Subroutine to free a locallock entry
830  */
831 static void
832 RemoveLocalLock(LOCALLOCK *locallock)
833 {
834         pfree(locallock->lockOwners);
835         locallock->lockOwners = NULL;
836         if (!hash_search(LockMethodLocalHash,
837                                          (void *) &(locallock->tag),
838                                          HASH_REMOVE, NULL))
839                 elog(WARNING, "locallock table corrupted");
840 }
841
842 /*
843  * LockCheckConflicts -- test whether requested lock conflicts
844  *              with those already granted
845  *
846  * Returns STATUS_FOUND if conflict, STATUS_OK if no conflict.
847  *
848  * NOTES:
849  *              Here's what makes this complicated: one process's locks don't
850  * conflict with one another, no matter what purpose they are held for
851  * (eg, session and transaction locks do not conflict).
852  * So, we must subtract off our own locks when determining whether the
853  * requested new lock conflicts with those already held.
854  */
855 int
856 LockCheckConflicts(LockMethod lockMethodTable,
857                                    LOCKMODE lockmode,
858                                    LOCK *lock,
859                                    PROCLOCK *proclock,
860                                    PGPROC *proc)
861 {
862         int                     numLockModes = lockMethodTable->numLockModes;
863         LOCKMASK        myLocks;
864         LOCKMASK        otherLocks;
865         int                     i;
866
867         /*
868          * first check for global conflicts: If no locks conflict with my request,
869          * then I get the lock.
870          *
871          * Checking for conflict: lock->grantMask represents the types of
872          * currently held locks.  conflictTable[lockmode] has a bit set for each
873          * type of lock that conflicts with request.   Bitwise compare tells if
874          * there is a conflict.
875          */
876         if (!(lockMethodTable->conflictTab[lockmode] & lock->grantMask))
877         {
878                 PROCLOCK_PRINT("LockCheckConflicts: no conflict", proclock);
879                 return STATUS_OK;
880         }
881
882         /*
883          * Rats.  Something conflicts.  But it could still be my own lock. We have
884          * to construct a conflict mask that does not reflect our own locks, but
885          * only lock types held by other processes.
886          */
887         myLocks = proclock->holdMask;
888         otherLocks = 0;
889         for (i = 1; i <= numLockModes; i++)
890         {
891                 int                     myHolding = (myLocks & LOCKBIT_ON(i)) ? 1 : 0;
892
893                 if (lock->granted[i] > myHolding)
894                         otherLocks |= LOCKBIT_ON(i);
895         }
896
897         /*
898          * now check again for conflicts.  'otherLocks' describes the types of
899          * locks held by other processes.  If one of these conflicts with the kind
900          * of lock that I want, there is a conflict and I have to sleep.
901          */
902         if (!(lockMethodTable->conflictTab[lockmode] & otherLocks))
903         {
904                 /* no conflict. OK to get the lock */
905                 PROCLOCK_PRINT("LockCheckConflicts: resolved", proclock);
906                 return STATUS_OK;
907         }
908
909         PROCLOCK_PRINT("LockCheckConflicts: conflicting", proclock);
910         return STATUS_FOUND;
911 }
912
913 /*
914  * GrantLock -- update the lock and proclock data structures to show
915  *              the lock request has been granted.
916  *
917  * NOTE: if proc was blocked, it also needs to be removed from the wait list
918  * and have its waitLock/waitProcLock fields cleared.  That's not done here.
919  *
920  * NOTE: the lock grant also has to be recorded in the associated LOCALLOCK
921  * table entry; but since we may be awaking some other process, we can't do
922  * that here; it's done by GrantLockLocal, instead.
923  */
924 void
925 GrantLock(LOCK *lock, PROCLOCK *proclock, LOCKMODE lockmode)
926 {
927         lock->nGranted++;
928         lock->granted[lockmode]++;
929         lock->grantMask |= LOCKBIT_ON(lockmode);
930         if (lock->granted[lockmode] == lock->requested[lockmode])
931                 lock->waitMask &= LOCKBIT_OFF(lockmode);
932         proclock->holdMask |= LOCKBIT_ON(lockmode);
933         LOCK_PRINT("GrantLock", lock, lockmode);
934         Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
935         Assert(lock->nGranted <= lock->nRequested);
936 }
937
938 /*
939  * UnGrantLock -- opposite of GrantLock.
940  *
941  * Updates the lock and proclock data structures to show that the lock
942  * is no longer held nor requested by the current holder.
943  *
944  * Returns true if there were any waiters waiting on the lock that
945  * should now be woken up with ProcLockWakeup.
946  */
947 static bool
948 UnGrantLock(LOCK *lock, LOCKMODE lockmode,
949                         PROCLOCK *proclock, LockMethod lockMethodTable)
950 {
951         bool            wakeupNeeded = false;
952
953         Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
954         Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
955         Assert(lock->nGranted <= lock->nRequested);
956
957         /*
958          * fix the general lock stats
959          */
960         lock->nRequested--;
961         lock->requested[lockmode]--;
962         lock->nGranted--;
963         lock->granted[lockmode]--;
964
965         if (lock->granted[lockmode] == 0)
966         {
967                 /* change the conflict mask.  No more of this lock type. */
968                 lock->grantMask &= LOCKBIT_OFF(lockmode);
969         }
970
971         LOCK_PRINT("UnGrantLock: updated", lock, lockmode);
972
973         /*
974          * We need only run ProcLockWakeup if the released lock conflicts with at
975          * least one of the lock types requested by waiter(s).  Otherwise whatever
976          * conflict made them wait must still exist.  NOTE: before MVCC, we could
977          * skip wakeup if lock->granted[lockmode] was still positive. But that's
978          * not true anymore, because the remaining granted locks might belong to
979          * some waiter, who could now be awakened because he doesn't conflict with
980          * his own locks.
981          */
982         if (lockMethodTable->conflictTab[lockmode] & lock->waitMask)
983                 wakeupNeeded = true;
984
985         /*
986          * Now fix the per-proclock state.
987          */
988         proclock->holdMask &= LOCKBIT_OFF(lockmode);
989         PROCLOCK_PRINT("UnGrantLock: updated", proclock);
990
991         return wakeupNeeded;
992 }
993
994 /*
995  * CleanUpLock -- clean up after releasing a lock.      We garbage-collect the
996  * proclock and lock objects if possible, and call ProcLockWakeup if there
997  * are remaining requests and the caller says it's OK.  (Normally, this
998  * should be called after UnGrantLock, and wakeupNeeded is the result from
999  * UnGrantLock.)
1000  *
1001  * The appropriate partition lock must be held at entry, and will be
1002  * held at exit.
1003  */
1004 static void
1005 CleanUpLock(LOCK *lock, PROCLOCK *proclock,
1006                         LockMethod lockMethodTable, uint32 hashcode,
1007                         bool wakeupNeeded)
1008 {
1009         /*
1010          * If this was my last hold on this lock, delete my entry in the proclock
1011          * table.
1012          */
1013         if (proclock->holdMask == 0)
1014         {
1015                 uint32          proclock_hashcode;
1016
1017                 PROCLOCK_PRINT("CleanUpLock: deleting", proclock);
1018                 SHMQueueDelete(&proclock->lockLink);
1019                 SHMQueueDelete(&proclock->procLink);
1020                 proclock_hashcode = ProcLockHashCode(&proclock->tag, hashcode);
1021                 if (!hash_search_with_hash_value(LockMethodProcLockHash,
1022                                                                                  (void *) &(proclock->tag),
1023                                                                                  proclock_hashcode,
1024                                                                                  HASH_REMOVE,
1025                                                                                  NULL))
1026                         elog(PANIC, "proclock table corrupted");
1027         }
1028
1029         if (lock->nRequested == 0)
1030         {
1031                 /*
1032                  * The caller just released the last lock, so garbage-collect the lock
1033                  * object.
1034                  */
1035                 LOCK_PRINT("CleanUpLock: deleting", lock, 0);
1036                 Assert(SHMQueueEmpty(&(lock->procLocks)));
1037                 if (!hash_search_with_hash_value(LockMethodLockHash,
1038                                                                                  (void *) &(lock->tag),
1039                                                                                  hashcode,
1040                                                                                  HASH_REMOVE,
1041                                                                                  NULL))
1042                         elog(PANIC, "lock table corrupted");
1043         }
1044         else if (wakeupNeeded)
1045         {
1046                 /* There are waiters on this lock, so wake them up. */
1047                 ProcLockWakeup(lockMethodTable, lock);
1048         }
1049 }
1050
1051 /*
1052  * GrantLockLocal -- update the locallock data structures to show
1053  *              the lock request has been granted.
1054  *
1055  * We expect that LockAcquire made sure there is room to add a new
1056  * ResourceOwner entry.
1057  */
1058 static void
1059 GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner)
1060 {
1061         LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
1062         int                     i;
1063
1064         Assert(locallock->numLockOwners < locallock->maxLockOwners);
1065         /* Count the total */
1066         locallock->nLocks++;
1067         /* Count the per-owner lock */
1068         for (i = 0; i < locallock->numLockOwners; i++)
1069         {
1070                 if (lockOwners[i].owner == owner)
1071                 {
1072                         lockOwners[i].nLocks++;
1073                         return;
1074                 }
1075         }
1076         lockOwners[i].owner = owner;
1077         lockOwners[i].nLocks = 1;
1078         locallock->numLockOwners++;
1079 }
1080
1081 /*
1082  * GrantAwaitedLock -- call GrantLockLocal for the lock we are doing
1083  *              WaitOnLock on.
1084  *
1085  * proc.c needs this for the case where we are booted off the lock by
1086  * timeout, but discover that someone granted us the lock anyway.
1087  *
1088  * We could just export GrantLockLocal, but that would require including
1089  * resowner.h in lock.h, which creates circularity.
1090  */
1091 void
1092 GrantAwaitedLock(void)
1093 {
1094         GrantLockLocal(awaitedLock, awaitedOwner);
1095 }
1096
1097 /*
1098  * WaitOnLock -- wait to acquire a lock
1099  *
1100  * Caller must have set MyProc->heldLocks to reflect locks already held
1101  * on the lockable object by this process.
1102  *
1103  * The appropriate partition lock must be held at entry.
1104  */
1105 static void
1106 WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner)
1107 {
1108         LOCKMETHODID lockmethodid = LOCALLOCK_LOCKMETHOD(*locallock);
1109         LockMethod      lockMethodTable = LockMethods[lockmethodid];
1110         const char *old_status;
1111         char       *new_status = NULL;
1112         int                     len;
1113
1114         LOCK_PRINT("WaitOnLock: sleeping on lock",
1115                            locallock->lock, locallock->tag.mode);
1116
1117         if (update_process_title)
1118         {
1119                 old_status = get_ps_display(&len);
1120                 new_status = (char *) palloc(len + 8 + 1);
1121                 memcpy(new_status, old_status, len);
1122                 strcpy(new_status + len, " waiting");
1123                 set_ps_display(new_status, false);
1124                 new_status[len] = '\0';         /* truncate off " waiting" */
1125         }
1126         
1127         awaitedLock = locallock;
1128         awaitedOwner = owner;
1129
1130         /*
1131          * NOTE: Think not to put any shared-state cleanup after the call to
1132          * ProcSleep, in either the normal or failure path.  The lock state must
1133          * be fully set by the lock grantor, or by CheckDeadLock if we give up
1134          * waiting for the lock.  This is necessary because of the possibility
1135          * that a cancel/die interrupt will interrupt ProcSleep after someone else
1136          * grants us the lock, but before we've noticed it. Hence, after granting,
1137          * the locktable state must fully reflect the fact that we own the lock;
1138          * we can't do additional work on return. Contrariwise, if we fail, any
1139          * cleanup must happen in xact abort processing, not here, to ensure it
1140          * will also happen in the cancel/die case.
1141          */
1142
1143         if (ProcSleep(locallock, lockMethodTable) != STATUS_OK)
1144         {
1145                 /*
1146                  * We failed as a result of a deadlock, see CheckDeadLock(). Quit now.
1147                  */
1148                 awaitedLock = NULL;
1149                 LOCK_PRINT("WaitOnLock: aborting on lock",
1150                                    locallock->lock, locallock->tag.mode);
1151                 LWLockRelease(LockHashPartitionLock(locallock->hashcode));
1152
1153                 /*
1154                  * Now that we aren't holding the partition lock, we can give an error
1155                  * report including details about the detected deadlock.
1156                  */
1157                 DeadLockReport();
1158                 /* not reached */
1159         }
1160
1161         awaitedLock = NULL;
1162
1163         if (update_process_title)
1164         {
1165                 set_ps_display(new_status, false);
1166                 pfree(new_status);
1167         }
1168
1169         LOCK_PRINT("WaitOnLock: wakeup on lock",
1170                            locallock->lock, locallock->tag.mode);
1171 }
1172
1173 /*
1174  * Remove a proc from the wait-queue it is on (caller must know it is on one).
1175  * This is only used when the proc has failed to get the lock, so we set its
1176  * waitStatus to STATUS_ERROR.
1177  *
1178  * Appropriate partition lock must be held by caller.  Also, caller is
1179  * responsible for signaling the proc if needed.
1180  *
1181  * NB: this does not clean up any locallock object that may exist for the lock.
1182  */
1183 void
1184 RemoveFromWaitQueue(PGPROC *proc, uint32 hashcode)
1185 {
1186         LOCK       *waitLock = proc->waitLock;
1187         PROCLOCK   *proclock = proc->waitProcLock;
1188         LOCKMODE        lockmode = proc->waitLockMode;
1189         LOCKMETHODID lockmethodid = LOCK_LOCKMETHOD(*waitLock);
1190
1191         /* Make sure proc is waiting */
1192         Assert(proc->waitStatus == STATUS_WAITING);
1193         Assert(proc->links.next != INVALID_OFFSET);
1194         Assert(waitLock);
1195         Assert(waitLock->waitProcs.size > 0);
1196         Assert(0 < lockmethodid && lockmethodid < lengthof(LockMethods));
1197
1198         /* Remove proc from lock's wait queue */
1199         SHMQueueDelete(&(proc->links));
1200         waitLock->waitProcs.size--;
1201
1202         /* Undo increments of request counts by waiting process */
1203         Assert(waitLock->nRequested > 0);
1204         Assert(waitLock->nRequested > proc->waitLock->nGranted);
1205         waitLock->nRequested--;
1206         Assert(waitLock->requested[lockmode] > 0);
1207         waitLock->requested[lockmode]--;
1208         /* don't forget to clear waitMask bit if appropriate */
1209         if (waitLock->granted[lockmode] == waitLock->requested[lockmode])
1210                 waitLock->waitMask &= LOCKBIT_OFF(lockmode);
1211
1212         /* Clean up the proc's own state, and pass it the ok/fail signal */
1213         proc->waitLock = NULL;
1214         proc->waitProcLock = NULL;
1215         proc->waitStatus = STATUS_ERROR;
1216
1217         /*
1218          * Delete the proclock immediately if it represents no already-held locks.
1219          * (This must happen now because if the owner of the lock decides to
1220          * release it, and the requested/granted counts then go to zero,
1221          * LockRelease expects there to be no remaining proclocks.) Then see if
1222          * any other waiters for the lock can be woken up now.
1223          */
1224         CleanUpLock(waitLock, proclock,
1225                                 LockMethods[lockmethodid], hashcode,
1226                                 true);
1227 }
1228
1229 /*
1230  * LockRelease -- look up 'locktag' and release one 'lockmode' lock on it.
1231  *              Release a session lock if 'sessionLock' is true, else release a
1232  *              regular transaction lock.
1233  *
1234  * Side Effects: find any waiting processes that are now wakable,
1235  *              grant them their requested locks and awaken them.
1236  *              (We have to grant the lock here to avoid a race between
1237  *              the waking process and any new process to
1238  *              come along and request the lock.)
1239  */
1240 bool
1241 LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
1242 {
1243         LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
1244         LockMethod      lockMethodTable;
1245         LOCALLOCKTAG localtag;
1246         LOCALLOCK  *locallock;
1247         LOCK       *lock;
1248         PROCLOCK   *proclock;
1249         LWLockId        partitionLock;
1250         bool            wakeupNeeded;
1251
1252         if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
1253                 elog(ERROR, "unrecognized lock method: %d", lockmethodid);
1254         lockMethodTable = LockMethods[lockmethodid];
1255         if (lockmode <= 0 || lockmode > lockMethodTable->numLockModes)
1256                 elog(ERROR, "unrecognized lock mode: %d", lockmode);
1257
1258 #ifdef LOCK_DEBUG
1259         if (LOCK_DEBUG_ENABLED(locktag))
1260                 elog(LOG, "LockRelease: lock [%u,%u] %s",
1261                          locktag->locktag_field1, locktag->locktag_field2,
1262                          lockMethodTable->lockModeNames[lockmode]);
1263 #endif
1264
1265         /*
1266          * Find the LOCALLOCK entry for this lock and lockmode
1267          */
1268         MemSet(&localtag, 0, sizeof(localtag));         /* must clear padding */
1269         localtag.lock = *locktag;
1270         localtag.mode = lockmode;
1271
1272         locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash,
1273                                                                                   (void *) &localtag,
1274                                                                                   HASH_FIND, NULL);
1275
1276         /*
1277          * let the caller print its own error message, too. Do not ereport(ERROR).
1278          */
1279         if (!locallock || locallock->nLocks <= 0)
1280         {
1281                 elog(WARNING, "you don't own a lock of type %s",
1282                          lockMethodTable->lockModeNames[lockmode]);
1283                 return FALSE;
1284         }
1285
1286         /*
1287          * Decrease the count for the resource owner.
1288          */
1289         {
1290                 LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
1291                 ResourceOwner owner;
1292                 int                     i;
1293
1294                 /* Session locks are never transactional, else check table */
1295                 if (!sessionLock && lockMethodTable->transactional)
1296                         owner = CurrentResourceOwner;
1297                 else
1298                         owner = NULL;
1299
1300                 for (i = locallock->numLockOwners - 1; i >= 0; i--)
1301                 {
1302                         if (lockOwners[i].owner == owner)
1303                         {
1304                                 Assert(lockOwners[i].nLocks > 0);
1305                                 if (--lockOwners[i].nLocks == 0)
1306                                 {
1307                                         /* compact out unused slot */
1308                                         locallock->numLockOwners--;
1309                                         if (i < locallock->numLockOwners)
1310                                                 lockOwners[i] = lockOwners[locallock->numLockOwners];
1311                                 }
1312                                 break;
1313                         }
1314                 }
1315                 if (i < 0)
1316                 {
1317                         /* don't release a lock belonging to another owner */
1318                         elog(WARNING, "you don't own a lock of type %s",
1319                                  lockMethodTable->lockModeNames[lockmode]);
1320                         return FALSE;
1321                 }
1322         }
1323
1324         /*
1325          * Decrease the total local count.      If we're still holding the lock, we're
1326          * done.
1327          */
1328         locallock->nLocks--;
1329
1330         if (locallock->nLocks > 0)
1331                 return TRUE;
1332
1333         /*
1334          * Otherwise we've got to mess with the shared lock table.
1335          */
1336         partitionLock = LockHashPartitionLock(locallock->hashcode);
1337
1338         LWLockAcquire(partitionLock, LW_EXCLUSIVE);
1339
1340         /*
1341          * We don't need to re-find the lock or proclock, since we kept their
1342          * addresses in the locallock table, and they couldn't have been removed
1343          * while we were holding a lock on them.
1344          */
1345         lock = locallock->lock;
1346         LOCK_PRINT("LockRelease: found", lock, lockmode);
1347         proclock = locallock->proclock;
1348         PROCLOCK_PRINT("LockRelease: found", proclock);
1349
1350         /*
1351          * Double-check that we are actually holding a lock of the type we want to
1352          * release.
1353          */
1354         if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
1355         {
1356                 PROCLOCK_PRINT("LockRelease: WRONGTYPE", proclock);
1357                 LWLockRelease(partitionLock);
1358                 elog(WARNING, "you don't own a lock of type %s",
1359                          lockMethodTable->lockModeNames[lockmode]);
1360                 RemoveLocalLock(locallock);
1361                 return FALSE;
1362         }
1363
1364         /*
1365          * Do the releasing.  CleanUpLock will waken any now-wakable waiters.
1366          */
1367         wakeupNeeded = UnGrantLock(lock, lockmode, proclock, lockMethodTable);
1368
1369         CleanUpLock(lock, proclock,
1370                                 lockMethodTable, locallock->hashcode,
1371                                 wakeupNeeded);
1372
1373         LWLockRelease(partitionLock);
1374
1375         RemoveLocalLock(locallock);
1376         return TRUE;
1377 }
1378
1379 /*
1380  * LockReleaseAll -- Release all locks of the specified lock method that
1381  *              are held by the current process.
1382  *
1383  * Well, not necessarily *all* locks.  The available behaviors are:
1384  *              allLocks == true: release all locks including session locks.
1385  *              allLocks == false: release all non-session locks.
1386  */
1387 void
1388 LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks)
1389 {
1390         HASH_SEQ_STATUS status;
1391         LockMethod      lockMethodTable;
1392         int                     i,
1393                                 numLockModes;
1394         LOCALLOCK  *locallock;
1395         LOCK       *lock;
1396         PROCLOCK   *proclock;
1397         int                     partition;
1398
1399         if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
1400                 elog(ERROR, "unrecognized lock method: %d", lockmethodid);
1401         lockMethodTable = LockMethods[lockmethodid];
1402
1403 #ifdef LOCK_DEBUG
1404         if (*(lockMethodTable->trace_flag))
1405                 elog(LOG, "LockReleaseAll: lockmethod=%d", lockmethodid);
1406 #endif
1407
1408         numLockModes = lockMethodTable->numLockModes;
1409
1410         /*
1411          * First we run through the locallock table and get rid of unwanted
1412          * entries, then we scan the process's proclocks and get rid of those. We
1413          * do this separately because we may have multiple locallock entries
1414          * pointing to the same proclock, and we daren't end up with any dangling
1415          * pointers.
1416          */
1417         hash_seq_init(&status, LockMethodLocalHash);
1418
1419         while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
1420         {
1421                 if (locallock->proclock == NULL || locallock->lock == NULL)
1422                 {
1423                         /*
1424                          * We must've run out of shared memory while trying to set up this
1425                          * lock.  Just forget the local entry.
1426                          */
1427                         Assert(locallock->nLocks == 0);
1428                         RemoveLocalLock(locallock);
1429                         continue;
1430                 }
1431
1432                 /* Ignore items that are not of the lockmethod to be removed */
1433                 if (LOCALLOCK_LOCKMETHOD(*locallock) != lockmethodid)
1434                         continue;
1435
1436                 /*
1437                  * If we are asked to release all locks, we can just zap the entry.
1438                  * Otherwise, must scan to see if there are session locks. We assume
1439                  * there is at most one lockOwners entry for session locks.
1440                  */
1441                 if (!allLocks)
1442                 {
1443                         LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
1444
1445                         /* If it's above array position 0, move it down to 0 */
1446                         for (i = locallock->numLockOwners - 1; i > 0; i--)
1447                         {
1448                                 if (lockOwners[i].owner == NULL)
1449                                 {
1450                                         lockOwners[0] = lockOwners[i];
1451                                         break;
1452                                 }
1453                         }
1454
1455                         if (locallock->numLockOwners > 0 &&
1456                                 lockOwners[0].owner == NULL &&
1457                                 lockOwners[0].nLocks > 0)
1458                         {
1459                                 /* Fix the locallock to show just the session locks */
1460                                 locallock->nLocks = lockOwners[0].nLocks;
1461                                 locallock->numLockOwners = 1;
1462                                 /* We aren't deleting this locallock, so done */
1463                                 continue;
1464                         }
1465                 }
1466
1467                 /* Mark the proclock to show we need to release this lockmode */
1468                 if (locallock->nLocks > 0)
1469                         locallock->proclock->releaseMask |= LOCKBIT_ON(locallock->tag.mode);
1470
1471                 /* And remove the locallock hashtable entry */
1472                 RemoveLocalLock(locallock);
1473         }
1474
1475         /*
1476          * Now, scan each lock partition separately.
1477          */
1478         for (partition = 0; partition < NUM_LOCK_PARTITIONS; partition++)
1479         {
1480                 LWLockId        partitionLock = FirstLockMgrLock + partition;
1481                 SHM_QUEUE  *procLocks = &(MyProc->myProcLocks[partition]);
1482
1483                 proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
1484                                                                                          offsetof(PROCLOCK, procLink));
1485
1486                 if (!proclock)
1487                         continue;                       /* needn't examine this partition */
1488
1489                 LWLockAcquire(partitionLock, LW_EXCLUSIVE);
1490
1491                 while (proclock)
1492                 {
1493                         bool            wakeupNeeded = false;
1494                         PROCLOCK   *nextplock;
1495
1496                         /* Get link first, since we may unlink/delete this proclock */
1497                         nextplock = (PROCLOCK *)
1498                                 SHMQueueNext(procLocks, &proclock->procLink,
1499                                                          offsetof(PROCLOCK, procLink));
1500
1501                         Assert(proclock->tag.myProc == MyProc);
1502
1503                         lock = proclock->tag.myLock;
1504
1505                         /* Ignore items that are not of the lockmethod to be removed */
1506                         if (LOCK_LOCKMETHOD(*lock) != lockmethodid)
1507                                 goto next_item;
1508
1509                         /*
1510                          * In allLocks mode, force release of all locks even if locallock
1511                          * table had problems
1512                          */
1513                         if (allLocks)
1514                                 proclock->releaseMask = proclock->holdMask;
1515                         else
1516                                 Assert((proclock->releaseMask & ~proclock->holdMask) == 0);
1517
1518                         /*
1519                          * Ignore items that have nothing to be released, unless they have
1520                          * holdMask == 0 and are therefore recyclable
1521                          */
1522                         if (proclock->releaseMask == 0 && proclock->holdMask != 0)
1523                                 goto next_item;
1524
1525                         PROCLOCK_PRINT("LockReleaseAll", proclock);
1526                         LOCK_PRINT("LockReleaseAll", lock, 0);
1527                         Assert(lock->nRequested >= 0);
1528                         Assert(lock->nGranted >= 0);
1529                         Assert(lock->nGranted <= lock->nRequested);
1530                         Assert((proclock->holdMask & ~lock->grantMask) == 0);
1531
1532                         /*
1533                          * Release the previously-marked lock modes
1534                          */
1535                         for (i = 1; i <= numLockModes; i++)
1536                         {
1537                                 if (proclock->releaseMask & LOCKBIT_ON(i))
1538                                         wakeupNeeded |= UnGrantLock(lock, i, proclock,
1539                                                                                                 lockMethodTable);
1540                         }
1541                         Assert((lock->nRequested >= 0) && (lock->nGranted >= 0));
1542                         Assert(lock->nGranted <= lock->nRequested);
1543                         LOCK_PRINT("LockReleaseAll: updated", lock, 0);
1544
1545                         proclock->releaseMask = 0;
1546
1547                         /* CleanUpLock will wake up waiters if needed. */
1548                         CleanUpLock(lock, proclock,
1549                                                 lockMethodTable,
1550                                                 LockTagHashCode(&lock->tag),
1551                                                 wakeupNeeded);
1552
1553                 next_item:
1554                         proclock = nextplock;
1555                 } /* loop over PROCLOCKs within this partition */
1556
1557                 LWLockRelease(partitionLock);
1558         } /* loop over partitions */
1559
1560 #ifdef LOCK_DEBUG
1561         if (*(lockMethodTable->trace_flag))
1562                 elog(LOG, "LockReleaseAll done");
1563 #endif
1564 }
1565
1566 /*
1567  * LockReleaseCurrentOwner
1568  *              Release all locks belonging to CurrentResourceOwner
1569  */
1570 void
1571 LockReleaseCurrentOwner(void)
1572 {
1573         HASH_SEQ_STATUS status;
1574         LOCALLOCK  *locallock;
1575         LOCALLOCKOWNER *lockOwners;
1576         int                     i;
1577
1578         hash_seq_init(&status, LockMethodLocalHash);
1579
1580         while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
1581         {
1582                 /* Ignore items that must be nontransactional */
1583                 if (!LockMethods[LOCALLOCK_LOCKMETHOD(*locallock)]->transactional)
1584                         continue;
1585
1586                 /* Scan to see if there are any locks belonging to current owner */
1587                 lockOwners = locallock->lockOwners;
1588                 for (i = locallock->numLockOwners - 1; i >= 0; i--)
1589                 {
1590                         if (lockOwners[i].owner == CurrentResourceOwner)
1591                         {
1592                                 Assert(lockOwners[i].nLocks > 0);
1593                                 if (lockOwners[i].nLocks < locallock->nLocks)
1594                                 {
1595                                         /*
1596                                          * We will still hold this lock after forgetting this
1597                                          * ResourceOwner.
1598                                          */
1599                                         locallock->nLocks -= lockOwners[i].nLocks;
1600                                         /* compact out unused slot */
1601                                         locallock->numLockOwners--;
1602                                         if (i < locallock->numLockOwners)
1603                                                 lockOwners[i] = lockOwners[locallock->numLockOwners];
1604                                 }
1605                                 else
1606                                 {
1607                                         Assert(lockOwners[i].nLocks == locallock->nLocks);
1608                                         /* We want to call LockRelease just once */
1609                                         lockOwners[i].nLocks = 1;
1610                                         locallock->nLocks = 1;
1611                                         if (!LockRelease(&locallock->tag.lock,
1612                                                                          locallock->tag.mode,
1613                                                                          false))
1614                                                 elog(WARNING, "LockReleaseCurrentOwner: failed??");
1615                                 }
1616                                 break;
1617                         }
1618                 }
1619         }
1620 }
1621
1622 /*
1623  * LockReassignCurrentOwner
1624  *              Reassign all locks belonging to CurrentResourceOwner to belong
1625  *              to its parent resource owner
1626  */
1627 void
1628 LockReassignCurrentOwner(void)
1629 {
1630         ResourceOwner parent = ResourceOwnerGetParent(CurrentResourceOwner);
1631         HASH_SEQ_STATUS status;
1632         LOCALLOCK  *locallock;
1633         LOCALLOCKOWNER *lockOwners;
1634
1635         Assert(parent != NULL);
1636
1637         hash_seq_init(&status, LockMethodLocalHash);
1638
1639         while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
1640         {
1641                 int                     i;
1642                 int                     ic = -1;
1643                 int                     ip = -1;
1644
1645                 /* Ignore items that must be nontransactional */
1646                 if (!LockMethods[LOCALLOCK_LOCKMETHOD(*locallock)]->transactional)
1647                         continue;
1648
1649                 /*
1650                  * Scan to see if there are any locks belonging to current owner or
1651                  * its parent
1652                  */
1653                 lockOwners = locallock->lockOwners;
1654                 for (i = locallock->numLockOwners - 1; i >= 0; i--)
1655                 {
1656                         if (lockOwners[i].owner == CurrentResourceOwner)
1657                                 ic = i;
1658                         else if (lockOwners[i].owner == parent)
1659                                 ip = i;
1660                 }
1661
1662                 if (ic < 0)
1663                         continue;                       /* no current locks */
1664
1665                 if (ip < 0)
1666                 {
1667                         /* Parent has no slot, so just give it child's slot */
1668                         lockOwners[ic].owner = parent;
1669                 }
1670                 else
1671                 {
1672                         /* Merge child's count with parent's */
1673                         lockOwners[ip].nLocks += lockOwners[ic].nLocks;
1674                         /* compact out unused slot */
1675                         locallock->numLockOwners--;
1676                         if (ic < locallock->numLockOwners)
1677                                 lockOwners[ic] = lockOwners[locallock->numLockOwners];
1678                 }
1679         }
1680 }
1681
1682
1683 /*
1684  * AtPrepare_Locks
1685  *              Do the preparatory work for a PREPARE: make 2PC state file records
1686  *              for all locks currently held.
1687  *
1688  * Non-transactional locks are ignored.
1689  *
1690  * There are some special cases that we error out on: we can't be holding
1691  * any session locks (should be OK since only VACUUM uses those) and we
1692  * can't be holding any locks on temporary objects (since that would mess
1693  * up the current backend if it tries to exit before the prepared xact is
1694  * committed).
1695  */
1696 void
1697 AtPrepare_Locks(void)
1698 {
1699         HASH_SEQ_STATUS status;
1700         LOCALLOCK  *locallock;
1701
1702         /*
1703          * We don't need to touch shared memory for this --- all the necessary
1704          * state information is in the locallock table.
1705          */
1706         hash_seq_init(&status, LockMethodLocalHash);
1707
1708         while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
1709         {
1710                 TwoPhaseLockRecord record;
1711                 LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
1712                 int                     i;
1713
1714                 /* Ignore nontransactional locks */
1715                 if (!LockMethods[LOCALLOCK_LOCKMETHOD(*locallock)]->transactional)
1716                         continue;
1717
1718                 /* Ignore it if we don't actually hold the lock */
1719                 if (locallock->nLocks <= 0)
1720                         continue;
1721
1722                 /* Scan to verify there are no session locks */
1723                 for (i = locallock->numLockOwners - 1; i >= 0; i--)
1724                 {
1725                         /* elog not ereport since this should not happen */
1726                         if (lockOwners[i].owner == NULL)
1727                                 elog(ERROR, "cannot PREPARE when session locks exist");
1728                 }
1729
1730                 /* Can't handle it if the lock is on a temporary object */
1731                 if (locallock->isTempObject)
1732                         ereport(ERROR,
1733                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1734                                          errmsg("cannot PREPARE a transaction that has operated on temporary tables")));
1735
1736                 /*
1737                  * Create a 2PC record.
1738                  */
1739                 memcpy(&(record.locktag), &(locallock->tag.lock), sizeof(LOCKTAG));
1740                 record.lockmode = locallock->tag.mode;
1741
1742                 RegisterTwoPhaseRecord(TWOPHASE_RM_LOCK_ID, 0,
1743                                                            &record, sizeof(TwoPhaseLockRecord));
1744         }
1745 }
1746
1747 /*
1748  * PostPrepare_Locks
1749  *              Clean up after successful PREPARE
1750  *
1751  * Here, we want to transfer ownership of our locks to a dummy PGPROC
1752  * that's now associated with the prepared transaction, and we want to
1753  * clean out the corresponding entries in the LOCALLOCK table.
1754  *
1755  * Note: by removing the LOCALLOCK entries, we are leaving dangling
1756  * pointers in the transaction's resource owner.  This is OK at the
1757  * moment since resowner.c doesn't try to free locks retail at a toplevel
1758  * transaction commit or abort.  We could alternatively zero out nLocks
1759  * and leave the LOCALLOCK entries to be garbage-collected by LockReleaseAll,
1760  * but that probably costs more cycles.
1761  */
1762 void
1763 PostPrepare_Locks(TransactionId xid)
1764 {
1765         PGPROC     *newproc = TwoPhaseGetDummyProc(xid);
1766         HASH_SEQ_STATUS status;
1767         LOCALLOCK  *locallock;
1768         LOCK       *lock;
1769         PROCLOCK   *proclock;
1770         PROCLOCKTAG proclocktag;
1771         bool            found;
1772         int                     partition;
1773
1774         /* This is a critical section: any error means big trouble */
1775         START_CRIT_SECTION();
1776
1777         /*
1778          * First we run through the locallock table and get rid of unwanted
1779          * entries, then we scan the process's proclocks and transfer them to the
1780          * target proc.
1781          *
1782          * We do this separately because we may have multiple locallock entries
1783          * pointing to the same proclock, and we daren't end up with any dangling
1784          * pointers.
1785          */
1786         hash_seq_init(&status, LockMethodLocalHash);
1787
1788         while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
1789         {
1790                 if (locallock->proclock == NULL || locallock->lock == NULL)
1791                 {
1792                         /*
1793                          * We must've run out of shared memory while trying to set up this
1794                          * lock.  Just forget the local entry.
1795                          */
1796                         Assert(locallock->nLocks == 0);
1797                         RemoveLocalLock(locallock);
1798                         continue;
1799                 }
1800
1801                 /* Ignore nontransactional locks */
1802                 if (!LockMethods[LOCALLOCK_LOCKMETHOD(*locallock)]->transactional)
1803                         continue;
1804
1805                 /* We already checked there are no session locks */
1806
1807                 /* Mark the proclock to show we need to release this lockmode */
1808                 if (locallock->nLocks > 0)
1809                         locallock->proclock->releaseMask |= LOCKBIT_ON(locallock->tag.mode);
1810
1811                 /* And remove the locallock hashtable entry */
1812                 RemoveLocalLock(locallock);
1813         }
1814
1815         /*
1816          * Now, scan each lock partition separately.
1817          */
1818         for (partition = 0; partition < NUM_LOCK_PARTITIONS; partition++)
1819         {
1820                 LWLockId        partitionLock = FirstLockMgrLock + partition;
1821                 SHM_QUEUE  *procLocks = &(MyProc->myProcLocks[partition]);
1822
1823                 proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
1824                                                                                          offsetof(PROCLOCK, procLink));
1825
1826                 if (!proclock)
1827                         continue;                       /* needn't examine this partition */
1828
1829                 LWLockAcquire(partitionLock, LW_EXCLUSIVE);
1830
1831                 while (proclock)
1832                 {
1833                         PROCLOCK   *nextplock;
1834                         LOCKMASK        holdMask;
1835                         PROCLOCK   *newproclock;
1836
1837                         /* Get link first, since we may unlink/delete this proclock */
1838                         nextplock = (PROCLOCK *)
1839                                 SHMQueueNext(procLocks, &proclock->procLink,
1840                                                          offsetof(PROCLOCK, procLink));
1841
1842                         Assert(proclock->tag.myProc == MyProc);
1843
1844                         lock = proclock->tag.myLock;
1845
1846                         /* Ignore nontransactional locks */
1847                         if (!LockMethods[LOCK_LOCKMETHOD(*lock)]->transactional)
1848                                 goto next_item;
1849
1850                         PROCLOCK_PRINT("PostPrepare_Locks", proclock);
1851                         LOCK_PRINT("PostPrepare_Locks", lock, 0);
1852                         Assert(lock->nRequested >= 0);
1853                         Assert(lock->nGranted >= 0);
1854                         Assert(lock->nGranted <= lock->nRequested);
1855                         Assert((proclock->holdMask & ~lock->grantMask) == 0);
1856
1857                         /*
1858                          * Since there were no session locks, we should be releasing all
1859                          * locks
1860                          */
1861                         if (proclock->releaseMask != proclock->holdMask)
1862                                 elog(PANIC, "we seem to have dropped a bit somewhere");
1863
1864                         holdMask = proclock->holdMask;
1865
1866                         /*
1867                          * We cannot simply modify proclock->tag.myProc to reassign
1868                          * ownership of the lock, because that's part of the hash key and
1869                          * the proclock would then be in the wrong hash chain.  So, unlink
1870                          * and delete the old proclock; create a new one with the right
1871                          * contents; and link it into place.  We do it in this order to be
1872                          * certain we won't run out of shared memory (the way dynahash.c
1873                          * works, the deleted object is certain to be available for
1874                          * reallocation).
1875                          */
1876                         SHMQueueDelete(&proclock->lockLink);
1877                         SHMQueueDelete(&proclock->procLink);
1878                         if (!hash_search(LockMethodProcLockHash,
1879                                                          (void *) &(proclock->tag),
1880                                                          HASH_REMOVE, NULL))
1881                                 elog(PANIC, "proclock table corrupted");
1882
1883                         /*
1884                          * Create the hash key for the new proclock table.
1885                          */
1886                         proclocktag.myLock = lock;
1887                         proclocktag.myProc = newproc;
1888
1889                         newproclock = (PROCLOCK *) hash_search(LockMethodProcLockHash,
1890                                                                                                    (void *) &proclocktag,
1891                                                                                                    HASH_ENTER_NULL, &found);
1892                         if (!newproclock)
1893                                 ereport(PANIC,          /* should not happen */
1894                                                 (errcode(ERRCODE_OUT_OF_MEMORY),
1895                                                  errmsg("out of shared memory"),
1896                                                  errdetail("Not enough memory for reassigning the prepared transaction's locks.")));
1897
1898                         /*
1899                          * If new, initialize the new entry
1900                          */
1901                         if (!found)
1902                         {
1903                                 newproclock->holdMask = 0;
1904                                 newproclock->releaseMask = 0;
1905                                 /* Add new proclock to appropriate lists */
1906                                 SHMQueueInsertBefore(&lock->procLocks, &newproclock->lockLink);
1907                                 SHMQueueInsertBefore(&(newproc->myProcLocks[partition]),
1908                                                                          &newproclock->procLink);
1909                                 PROCLOCK_PRINT("PostPrepare_Locks: new", newproclock);
1910                         }
1911                         else
1912                         {
1913                                 PROCLOCK_PRINT("PostPrepare_Locks: found", newproclock);
1914                                 Assert((newproclock->holdMask & ~lock->grantMask) == 0);
1915                         }
1916
1917                         /*
1918                          * Pass over the identified lock ownership.
1919                          */
1920                         Assert((newproclock->holdMask & holdMask) == 0);
1921                         newproclock->holdMask |= holdMask;
1922
1923                 next_item:
1924                         proclock = nextplock;
1925                 } /* loop over PROCLOCKs within this partition */
1926
1927                 LWLockRelease(partitionLock);
1928         } /* loop over partitions */
1929
1930         END_CRIT_SECTION();
1931 }
1932
1933
1934 /*
1935  * Estimate shared-memory space used for lock tables
1936  */
1937 Size
1938 LockShmemSize(void)
1939 {
1940         Size            size = 0;
1941         long            max_table_size;
1942
1943         /* lock hash table */
1944         max_table_size = NLOCKENTS();
1945         size = add_size(size, hash_estimate_size(max_table_size, sizeof(LOCK)));
1946
1947         /* proclock hash table */
1948         max_table_size *= 2;
1949         size = add_size(size, hash_estimate_size(max_table_size, sizeof(PROCLOCK)));
1950
1951         /*
1952          * Since NLOCKENTS is only an estimate, add 10% safety margin.
1953          */
1954         size = add_size(size, size / 10);
1955
1956         return size;
1957 }
1958
1959 /*
1960  * GetLockStatusData - Return a summary of the lock manager's internal
1961  * status, for use in a user-level reporting function.
1962  *
1963  * The return data consists of an array of PROCLOCK objects, with the
1964  * associated PGPROC and LOCK objects for each.  Note that multiple
1965  * copies of the same PGPROC and/or LOCK objects are likely to appear.
1966  * It is the caller's responsibility to match up duplicates if wanted.
1967  *
1968  * The design goal is to hold the LWLocks for as short a time as possible;
1969  * thus, this function simply makes a copy of the necessary data and releases
1970  * the locks, allowing the caller to contemplate and format the data for as
1971  * long as it pleases.
1972  */
1973 LockData *
1974 GetLockStatusData(void)
1975 {
1976         LockData   *data;
1977         PROCLOCK   *proclock;
1978         HASH_SEQ_STATUS seqstat;
1979         int                     els;
1980         int                     el;
1981         int                     i;
1982
1983         data = (LockData *) palloc(sizeof(LockData));
1984
1985         /*
1986          * Acquire lock on the entire shared lock data structure.  We can't
1987          * operate one partition at a time if we want to deliver a self-consistent
1988          * view of the state.
1989          *
1990          * Since this is a read-only operation, we take shared instead of exclusive
1991          * lock.  There's not a whole lot of point to this, because all the normal
1992          * operations require exclusive lock, but it doesn't hurt anything either.
1993          * It will at least allow two backends to do GetLockStatusData in parallel.
1994          *
1995          * Must grab LWLocks in partition-number order to avoid LWLock deadlock.
1996          */
1997         for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
1998                 LWLockAcquire(FirstLockMgrLock + i, LW_SHARED);
1999
2000         /* Now we can safely count the number of proclocks */
2001         els = hash_get_num_entries(LockMethodProcLockHash);
2002
2003         data->nelements = els;
2004         data->proclocks = (PROCLOCK *) palloc(sizeof(PROCLOCK) * els);
2005         data->procs = (PGPROC *) palloc(sizeof(PGPROC) * els);
2006         data->locks = (LOCK *) palloc(sizeof(LOCK) * els);
2007
2008         /* Now scan the tables to copy the data */
2009         hash_seq_init(&seqstat, LockMethodProcLockHash);
2010
2011         el = 0;
2012         while ((proclock = (PROCLOCK *) hash_seq_search(&seqstat)))
2013         {
2014                 PGPROC     *proc = proclock->tag.myProc;
2015                 LOCK       *lock = proclock->tag.myLock;
2016
2017                 memcpy(&(data->proclocks[el]), proclock, sizeof(PROCLOCK));
2018                 memcpy(&(data->procs[el]), proc, sizeof(PGPROC));
2019                 memcpy(&(data->locks[el]), lock, sizeof(LOCK));
2020
2021                 el++;
2022         }
2023
2024         /* And release locks */
2025         for (i = NUM_LOCK_PARTITIONS; --i >= 0; )
2026                 LWLockRelease(FirstLockMgrLock + i);
2027
2028         Assert(el == data->nelements);
2029
2030         return data;
2031 }
2032
2033 /* Provide the textual name of any lock mode */
2034 const char *
2035 GetLockmodeName(LOCKMETHODID lockmethodid, LOCKMODE mode)
2036 {
2037         Assert(lockmethodid > 0 && lockmethodid < lengthof(LockMethods));
2038         Assert(mode > 0 && mode <= LockMethods[lockmethodid]->numLockModes);
2039         return LockMethods[lockmethodid]->lockModeNames[mode];
2040 }
2041
2042 #ifdef LOCK_DEBUG
2043 /*
2044  * Dump all locks in the given proc's myProcLocks lists.
2045  *
2046  * Caller is responsible for having acquired appropriate LWLocks.
2047  */
2048 void
2049 DumpLocks(PGPROC *proc)
2050 {
2051         SHM_QUEUE  *procLocks;
2052         PROCLOCK   *proclock;
2053         LOCK       *lock;
2054         int                     i;
2055
2056         if (proc == NULL)
2057                 return;
2058
2059         if (proc->waitLock)
2060                 LOCK_PRINT("DumpLocks: waiting on", proc->waitLock, 0);
2061
2062         for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
2063         {
2064                 procLocks = &(proc->myProcLocks[i]);
2065
2066                 proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
2067                                                                                          offsetof(PROCLOCK, procLink));
2068
2069                 while (proclock)
2070                 {
2071                         Assert(proclock->tag.myProc == proc);
2072
2073                         lock = proclock->tag.myLock;
2074
2075                         PROCLOCK_PRINT("DumpLocks", proclock);
2076                         LOCK_PRINT("DumpLocks", lock, 0);
2077
2078                         proclock = (PROCLOCK *)
2079                                 SHMQueueNext(procLocks, &proclock->procLink,
2080                                                          offsetof(PROCLOCK, procLink));
2081                 }
2082         }
2083 }
2084
2085 /*
2086  * Dump all lmgr locks.
2087  *
2088  * Caller is responsible for having acquired appropriate LWLocks.
2089  */
2090 void
2091 DumpAllLocks(void)
2092 {
2093         PGPROC     *proc;
2094         PROCLOCK   *proclock;
2095         LOCK       *lock;
2096         HASH_SEQ_STATUS status;
2097
2098         proc = MyProc;
2099
2100         if (proc && proc->waitLock)
2101                 LOCK_PRINT("DumpAllLocks: waiting on", proc->waitLock, 0);
2102
2103         hash_seq_init(&status, LockMethodProcLockHash);
2104
2105         while ((proclock = (PROCLOCK *) hash_seq_search(&status)) != NULL)
2106         {
2107                 PROCLOCK_PRINT("DumpAllLocks", proclock);
2108
2109                 lock = proclock->tag.myLock;
2110                 if (lock)
2111                         LOCK_PRINT("DumpAllLocks", lock, 0);
2112                 else
2113                         elog(LOG, "DumpAllLocks: proclock->tag.myLock = NULL");
2114         }
2115 }
2116 #endif   /* LOCK_DEBUG */
2117
2118 /*
2119  * LOCK 2PC resource manager's routines
2120  */
2121
2122 /*
2123  * Re-acquire a lock belonging to a transaction that was prepared.
2124  *
2125  * Because this function is run at db startup, re-acquiring the locks should
2126  * never conflict with running transactions because there are none.  We
2127  * assume that the lock state represented by the stored 2PC files is legal.
2128  */
2129 void
2130 lock_twophase_recover(TransactionId xid, uint16 info,
2131                                           void *recdata, uint32 len)
2132 {
2133         TwoPhaseLockRecord *rec = (TwoPhaseLockRecord *) recdata;
2134         PGPROC     *proc = TwoPhaseGetDummyProc(xid);
2135         LOCKTAG    *locktag;
2136         LOCKMODE        lockmode;
2137         LOCKMETHODID lockmethodid;
2138         LOCK       *lock;
2139         PROCLOCK   *proclock;
2140         PROCLOCKTAG proclocktag;
2141         bool            found;
2142         uint32          hashcode;
2143         uint32          proclock_hashcode;
2144         int                     partition;
2145         LWLockId        partitionLock;
2146         LockMethod      lockMethodTable;
2147
2148         Assert(len == sizeof(TwoPhaseLockRecord));
2149         locktag = &rec->locktag;
2150         lockmode = rec->lockmode;
2151         lockmethodid = locktag->locktag_lockmethodid;
2152
2153         if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
2154                 elog(ERROR, "unrecognized lock method: %d", lockmethodid);
2155         lockMethodTable = LockMethods[lockmethodid];
2156
2157         hashcode = LockTagHashCode(locktag);
2158         partition = LockHashPartition(hashcode);
2159         partitionLock = LockHashPartitionLock(hashcode);
2160
2161         LWLockAcquire(partitionLock, LW_EXCLUSIVE);
2162
2163         /*
2164          * Find or create a lock with this tag.
2165          */
2166         lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
2167                                                                                                 (void *) locktag,
2168                                                                                                 hashcode,
2169                                                                                                 HASH_ENTER_NULL,
2170                                                                                                 &found);
2171         if (!lock)
2172         {
2173                 LWLockRelease(partitionLock);
2174                 ereport(ERROR,
2175                                 (errcode(ERRCODE_OUT_OF_MEMORY),
2176                                  errmsg("out of shared memory"),
2177                         errhint("You may need to increase max_locks_per_transaction.")));
2178         }
2179
2180         /*
2181          * if it's a new lock object, initialize it
2182          */
2183         if (!found)
2184         {
2185                 lock->grantMask = 0;
2186                 lock->waitMask = 0;
2187                 SHMQueueInit(&(lock->procLocks));
2188                 ProcQueueInit(&(lock->waitProcs));
2189                 lock->nRequested = 0;
2190                 lock->nGranted = 0;
2191                 MemSet(lock->requested, 0, sizeof(int) * MAX_LOCKMODES);
2192                 MemSet(lock->granted, 0, sizeof(int) * MAX_LOCKMODES);
2193                 LOCK_PRINT("lock_twophase_recover: new", lock, lockmode);
2194         }
2195         else
2196         {
2197                 LOCK_PRINT("lock_twophase_recover: found", lock, lockmode);
2198                 Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
2199                 Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
2200                 Assert(lock->nGranted <= lock->nRequested);
2201         }
2202
2203         /*
2204          * Create the hash key for the proclock table.
2205          */
2206         proclocktag.myLock = lock;
2207         proclocktag.myProc = proc;
2208
2209         proclock_hashcode = ProcLockHashCode(&proclocktag, hashcode);
2210
2211         /*
2212          * Find or create a proclock entry with this tag
2213          */
2214         proclock = (PROCLOCK *) hash_search_with_hash_value(LockMethodProcLockHash,
2215                                                                                                                 (void *) &proclocktag,
2216                                                                                                                 proclock_hashcode,
2217                                                                                                                 HASH_ENTER_NULL,
2218                                                                                                                 &found);
2219         if (!proclock)
2220         {
2221                 /* Ooops, not enough shmem for the proclock */
2222                 if (lock->nRequested == 0)
2223                 {
2224                         /*
2225                          * There are no other requestors of this lock, so garbage-collect
2226                          * the lock object.  We *must* do this to avoid a permanent leak
2227                          * of shared memory, because there won't be anything to cause
2228                          * anyone to release the lock object later.
2229                          */
2230                         Assert(SHMQueueEmpty(&(lock->procLocks)));
2231                         if (!hash_search_with_hash_value(LockMethodLockHash,
2232                                                                                          (void *) &(lock->tag),
2233                                                                                          hashcode,
2234                                                                                          HASH_REMOVE,
2235                                                                                          NULL))
2236                                 elog(PANIC, "lock table corrupted");
2237                 }
2238                 LWLockRelease(partitionLock);
2239                 ereport(ERROR,
2240                                 (errcode(ERRCODE_OUT_OF_MEMORY),
2241                                  errmsg("out of shared memory"),
2242                         errhint("You may need to increase max_locks_per_transaction.")));
2243         }
2244
2245         /*
2246          * If new, initialize the new entry
2247          */
2248         if (!found)
2249         {
2250                 proclock->holdMask = 0;
2251                 proclock->releaseMask = 0;
2252                 /* Add proclock to appropriate lists */
2253                 SHMQueueInsertBefore(&lock->procLocks, &proclock->lockLink);
2254                 SHMQueueInsertBefore(&(proc->myProcLocks[partition]),
2255                                                          &proclock->procLink);
2256                 PROCLOCK_PRINT("lock_twophase_recover: new", proclock);
2257         }
2258         else
2259         {
2260                 PROCLOCK_PRINT("lock_twophase_recover: found", proclock);
2261                 Assert((proclock->holdMask & ~lock->grantMask) == 0);
2262         }
2263
2264         /*
2265          * lock->nRequested and lock->requested[] count the total number of
2266          * requests, whether granted or waiting, so increment those immediately.
2267          */
2268         lock->nRequested++;
2269         lock->requested[lockmode]++;
2270         Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
2271
2272         /*
2273          * We shouldn't already hold the desired lock.
2274          */
2275         if (proclock->holdMask & LOCKBIT_ON(lockmode))
2276                 elog(ERROR, "lock %s on object %u/%u/%u is already held",
2277                          lockMethodTable->lockModeNames[lockmode],
2278                          lock->tag.locktag_field1, lock->tag.locktag_field2,
2279                          lock->tag.locktag_field3);
2280
2281         /*
2282          * We ignore any possible conflicts and just grant ourselves the lock.
2283          */
2284         GrantLock(lock, proclock, lockmode);
2285
2286         LWLockRelease(partitionLock);
2287 }
2288
2289 /*
2290  * 2PC processing routine for COMMIT PREPARED case.
2291  *
2292  * Find and release the lock indicated by the 2PC record.
2293  */
2294 void
2295 lock_twophase_postcommit(TransactionId xid, uint16 info,
2296                                                  void *recdata, uint32 len)
2297 {
2298         TwoPhaseLockRecord *rec = (TwoPhaseLockRecord *) recdata;
2299         PGPROC     *proc = TwoPhaseGetDummyProc(xid);
2300         LOCKTAG    *locktag;
2301         LOCKMODE        lockmode;
2302         LOCKMETHODID lockmethodid;
2303         LOCK       *lock;
2304         PROCLOCK   *proclock;
2305         PROCLOCKTAG proclocktag;
2306         uint32          hashcode;
2307         uint32          proclock_hashcode;
2308         LWLockId        partitionLock;
2309         LockMethod      lockMethodTable;
2310         bool            wakeupNeeded;
2311
2312         Assert(len == sizeof(TwoPhaseLockRecord));
2313         locktag = &rec->locktag;
2314         lockmode = rec->lockmode;
2315         lockmethodid = locktag->locktag_lockmethodid;
2316
2317         if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
2318                 elog(ERROR, "unrecognized lock method: %d", lockmethodid);
2319         lockMethodTable = LockMethods[lockmethodid];
2320
2321         hashcode = LockTagHashCode(locktag);
2322         partitionLock = LockHashPartitionLock(hashcode);
2323
2324         LWLockAcquire(partitionLock, LW_EXCLUSIVE);
2325
2326         /*
2327          * Re-find the lock object (it had better be there).
2328          */
2329         lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
2330                                                                                                 (void *) locktag,
2331                                                                                                 hashcode,
2332                                                                                                 HASH_FIND,
2333                                                                                                 NULL);
2334         if (!lock)
2335                 elog(PANIC, "failed to re-find shared lock object");
2336
2337         /*
2338          * Re-find the proclock object (ditto).
2339          */
2340         proclocktag.myLock = lock;
2341         proclocktag.myProc = proc;
2342
2343         proclock_hashcode = ProcLockHashCode(&proclocktag, hashcode);
2344
2345         proclock = (PROCLOCK *) hash_search_with_hash_value(LockMethodProcLockHash,
2346                                                                                                                 (void *) &proclocktag,
2347                                                                                                                 proclock_hashcode,
2348                                                                                                                 HASH_FIND,
2349                                                                                                                 NULL);
2350         if (!proclock)
2351                 elog(PANIC, "failed to re-find shared proclock object");
2352
2353         /*
2354          * Double-check that we are actually holding a lock of the type we want to
2355          * release.
2356          */
2357         if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
2358         {
2359                 PROCLOCK_PRINT("lock_twophase_postcommit: WRONGTYPE", proclock);
2360                 LWLockRelease(partitionLock);
2361                 elog(WARNING, "you don't own a lock of type %s",
2362                          lockMethodTable->lockModeNames[lockmode]);
2363                 return;
2364         }
2365
2366         /*
2367          * Do the releasing.  CleanUpLock will waken any now-wakable waiters.
2368          */
2369         wakeupNeeded = UnGrantLock(lock, lockmode, proclock, lockMethodTable);
2370
2371         CleanUpLock(lock, proclock,
2372                                 lockMethodTable, hashcode,
2373                                 wakeupNeeded);
2374
2375         LWLockRelease(partitionLock);
2376 }
2377
2378 /*
2379  * 2PC processing routine for ROLLBACK PREPARED case.
2380  *
2381  * This is actually just the same as the COMMIT case.
2382  */
2383 void
2384 lock_twophase_postabort(TransactionId xid, uint16 info,
2385                                                 void *recdata, uint32 len)
2386 {
2387         lock_twophase_postcommit(xid, info, recdata, len);
2388 }