]> granicus.if.org Git - postgresql/blob - src/backend/storage/lmgr/lock.c
Fix initialization of fake LSN for unlogged relations
[postgresql] / src / backend / storage / lmgr / lock.c
1 /*-------------------------------------------------------------------------
2  *
3  * lock.c
4  *        POSTGRES primary lock mechanism
5  *
6  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        src/backend/storage/lmgr/lock.c
12  *
13  * NOTES
14  *        A lock table is a shared memory hash table.  When
15  *        a process tries to acquire a lock of a type that conflicts
16  *        with existing locks, it is put to sleep using the routines
17  *        in storage/lmgr/proc.c.
18  *
19  *        For the most part, this code should be invoked via lmgr.c
20  *        or another lock-management module, not directly.
21  *
22  *      Interface:
23  *
24  *      InitLocks(), GetLocksMethodTable(), GetLockTagsMethodTable(),
25  *      LockAcquire(), LockRelease(), LockReleaseAll(),
26  *      LockCheckConflicts(), GrantLock()
27  *
28  *-------------------------------------------------------------------------
29  */
30 #include "postgres.h"
31
32 #include <signal.h>
33 #include <unistd.h>
34
35 #include "access/transam.h"
36 #include "access/twophase.h"
37 #include "access/twophase_rmgr.h"
38 #include "access/xact.h"
39 #include "access/xlog.h"
40 #include "miscadmin.h"
41 #include "pg_trace.h"
42 #include "pgstat.h"
43 #include "storage/proc.h"
44 #include "storage/procarray.h"
45 #include "storage/sinvaladt.h"
46 #include "storage/spin.h"
47 #include "storage/standby.h"
48 #include "utils/memutils.h"
49 #include "utils/ps_status.h"
50 #include "utils/resowner_private.h"
51
52
53 /* This configuration variable is used to set the lock table size */
54 int                     max_locks_per_xact; /* set by guc.c */
55
56 #define NLOCKENTS() \
57         mul_size(max_locks_per_xact, add_size(MaxBackends, max_prepared_xacts))
58
59
60 /*
61  * Data structures defining the semantics of the standard lock methods.
62  *
63  * The conflict table defines the semantics of the various lock modes.
64  */
65 static const LOCKMASK LockConflicts[] = {
66         0,
67
68         /* AccessShareLock */
69         LOCKBIT_ON(AccessExclusiveLock),
70
71         /* RowShareLock */
72         LOCKBIT_ON(ExclusiveLock) | LOCKBIT_ON(AccessExclusiveLock),
73
74         /* RowExclusiveLock */
75         LOCKBIT_ON(ShareLock) | LOCKBIT_ON(ShareRowExclusiveLock) |
76         LOCKBIT_ON(ExclusiveLock) | LOCKBIT_ON(AccessExclusiveLock),
77
78         /* ShareUpdateExclusiveLock */
79         LOCKBIT_ON(ShareUpdateExclusiveLock) |
80         LOCKBIT_ON(ShareLock) | LOCKBIT_ON(ShareRowExclusiveLock) |
81         LOCKBIT_ON(ExclusiveLock) | LOCKBIT_ON(AccessExclusiveLock),
82
83         /* ShareLock */
84         LOCKBIT_ON(RowExclusiveLock) | LOCKBIT_ON(ShareUpdateExclusiveLock) |
85         LOCKBIT_ON(ShareRowExclusiveLock) |
86         LOCKBIT_ON(ExclusiveLock) | LOCKBIT_ON(AccessExclusiveLock),
87
88         /* ShareRowExclusiveLock */
89         LOCKBIT_ON(RowExclusiveLock) | LOCKBIT_ON(ShareUpdateExclusiveLock) |
90         LOCKBIT_ON(ShareLock) | LOCKBIT_ON(ShareRowExclusiveLock) |
91         LOCKBIT_ON(ExclusiveLock) | LOCKBIT_ON(AccessExclusiveLock),
92
93         /* ExclusiveLock */
94         LOCKBIT_ON(RowShareLock) |
95         LOCKBIT_ON(RowExclusiveLock) | LOCKBIT_ON(ShareUpdateExclusiveLock) |
96         LOCKBIT_ON(ShareLock) | LOCKBIT_ON(ShareRowExclusiveLock) |
97         LOCKBIT_ON(ExclusiveLock) | LOCKBIT_ON(AccessExclusiveLock),
98
99         /* AccessExclusiveLock */
100         LOCKBIT_ON(AccessShareLock) | LOCKBIT_ON(RowShareLock) |
101         LOCKBIT_ON(RowExclusiveLock) | LOCKBIT_ON(ShareUpdateExclusiveLock) |
102         LOCKBIT_ON(ShareLock) | LOCKBIT_ON(ShareRowExclusiveLock) |
103         LOCKBIT_ON(ExclusiveLock) | LOCKBIT_ON(AccessExclusiveLock)
104
105 };
106
107 /* Names of lock modes, for debug printouts */
108 static const char *const lock_mode_names[] =
109 {
110         "INVALID",
111         "AccessShareLock",
112         "RowShareLock",
113         "RowExclusiveLock",
114         "ShareUpdateExclusiveLock",
115         "ShareLock",
116         "ShareRowExclusiveLock",
117         "ExclusiveLock",
118         "AccessExclusiveLock"
119 };
120
121 #ifndef LOCK_DEBUG
122 static bool Dummy_trace = false;
123 #endif
124
125 static const LockMethodData default_lockmethod = {
126         AccessExclusiveLock,            /* highest valid lock mode number */
127         LockConflicts,
128         lock_mode_names,
129 #ifdef LOCK_DEBUG
130         &Trace_locks
131 #else
132         &Dummy_trace
133 #endif
134 };
135
136 static const LockMethodData user_lockmethod = {
137         AccessExclusiveLock,            /* highest valid lock mode number */
138         LockConflicts,
139         lock_mode_names,
140 #ifdef LOCK_DEBUG
141         &Trace_userlocks
142 #else
143         &Dummy_trace
144 #endif
145 };
146
147 /*
148  * map from lock method id to the lock table data structures
149  */
150 static const LockMethod LockMethods[] = {
151         NULL,
152         &default_lockmethod,
153         &user_lockmethod
154 };
155
156
157 /* Record that's written to 2PC state file when a lock is persisted */
158 typedef struct TwoPhaseLockRecord
159 {
160         LOCKTAG         locktag;
161         LOCKMODE        lockmode;
162 } TwoPhaseLockRecord;
163
164
165 /*
166  * Count of the number of fast path lock slots we believe to be used.  This
167  * might be higher than the real number if another backend has transferred
168  * our locks to the primary lock table, but it can never be lower than the
169  * real value, since only we can acquire locks on our own behalf.
170  */
171 static int      FastPathLocalUseCount = 0;
172
173 /* Macros for manipulating proc->fpLockBits */
174 #define FAST_PATH_BITS_PER_SLOT                 3
175 #define FAST_PATH_LOCKNUMBER_OFFSET             1
176 #define FAST_PATH_MASK                                  ((1 << FAST_PATH_BITS_PER_SLOT) - 1)
177 #define FAST_PATH_GET_BITS(proc, n) \
178         (((proc)->fpLockBits >> (FAST_PATH_BITS_PER_SLOT * n)) & FAST_PATH_MASK)
179 #define FAST_PATH_BIT_POSITION(n, l) \
180         (AssertMacro((l) >= FAST_PATH_LOCKNUMBER_OFFSET), \
181          AssertMacro((l) < FAST_PATH_BITS_PER_SLOT+FAST_PATH_LOCKNUMBER_OFFSET), \
182          AssertMacro((n) < FP_LOCK_SLOTS_PER_BACKEND), \
183          ((l) - FAST_PATH_LOCKNUMBER_OFFSET + FAST_PATH_BITS_PER_SLOT * (n)))
184 #define FAST_PATH_SET_LOCKMODE(proc, n, l) \
185          (proc)->fpLockBits |= UINT64CONST(1) << FAST_PATH_BIT_POSITION(n, l)
186 #define FAST_PATH_CLEAR_LOCKMODE(proc, n, l) \
187          (proc)->fpLockBits &= ~(UINT64CONST(1) << FAST_PATH_BIT_POSITION(n, l))
188 #define FAST_PATH_CHECK_LOCKMODE(proc, n, l) \
189          ((proc)->fpLockBits & (UINT64CONST(1) << FAST_PATH_BIT_POSITION(n, l)))
190
191 /*
192  * The fast-path lock mechanism is concerned only with relation locks on
193  * unshared relations by backends bound to a database.  The fast-path
194  * mechanism exists mostly to accelerate acquisition and release of locks
195  * that rarely conflict.  Because ShareUpdateExclusiveLock is
196  * self-conflicting, it can't use the fast-path mechanism; but it also does
197  * not conflict with any of the locks that do, so we can ignore it completely.
198  */
199 #define EligibleForRelationFastPath(locktag, mode) \
200         ((locktag)->locktag_lockmethodid == DEFAULT_LOCKMETHOD && \
201         (locktag)->locktag_type == LOCKTAG_RELATION && \
202         (locktag)->locktag_field1 == MyDatabaseId && \
203         MyDatabaseId != InvalidOid && \
204         (mode) < ShareUpdateExclusiveLock)
205 #define ConflictsWithRelationFastPath(locktag, mode) \
206         ((locktag)->locktag_lockmethodid == DEFAULT_LOCKMETHOD && \
207         (locktag)->locktag_type == LOCKTAG_RELATION && \
208         (locktag)->locktag_field1 != InvalidOid && \
209         (mode) > ShareUpdateExclusiveLock)
210
211 static bool FastPathGrantRelationLock(Oid relid, LOCKMODE lockmode);
212 static bool FastPathUnGrantRelationLock(Oid relid, LOCKMODE lockmode);
213 static bool FastPathTransferRelationLocks(LockMethod lockMethodTable,
214                                                                                   const LOCKTAG *locktag, uint32 hashcode);
215 static PROCLOCK *FastPathGetRelationLockEntry(LOCALLOCK *locallock);
216
217 /*
218  * To make the fast-path lock mechanism work, we must have some way of
219  * preventing the use of the fast-path when a conflicting lock might be present.
220  * We partition* the locktag space into FAST_PATH_STRONG_LOCK_HASH_PARTITIONS,
221  * and maintain an integer count of the number of "strong" lockers
222  * in each partition.  When any "strong" lockers are present (which is
223  * hopefully not very often), the fast-path mechanism can't be used, and we
224  * must fall back to the slower method of pushing matching locks directly
225  * into the main lock tables.
226  *
227  * The deadlock detector does not know anything about the fast path mechanism,
228  * so any locks that might be involved in a deadlock must be transferred from
229  * the fast-path queues to the main lock table.
230  */
231
232 #define FAST_PATH_STRONG_LOCK_HASH_BITS                 10
233 #define FAST_PATH_STRONG_LOCK_HASH_PARTITIONS \
234         (1 << FAST_PATH_STRONG_LOCK_HASH_BITS)
235 #define FastPathStrongLockHashPartition(hashcode) \
236         ((hashcode) % FAST_PATH_STRONG_LOCK_HASH_PARTITIONS)
237
238 typedef struct
239 {
240         slock_t         mutex;
241         uint32          count[FAST_PATH_STRONG_LOCK_HASH_PARTITIONS];
242 } FastPathStrongRelationLockData;
243
244 static volatile FastPathStrongRelationLockData *FastPathStrongRelationLocks;
245
246
247 /*
248  * Pointers to hash tables containing lock state
249  *
250  * The LockMethodLockHash and LockMethodProcLockHash hash tables are in
251  * shared memory; LockMethodLocalHash is local to each backend.
252  */
253 static HTAB *LockMethodLockHash;
254 static HTAB *LockMethodProcLockHash;
255 static HTAB *LockMethodLocalHash;
256
257
258 /* private state for error cleanup */
259 static LOCALLOCK *StrongLockInProgress;
260 static LOCALLOCK *awaitedLock;
261 static ResourceOwner awaitedOwner;
262
263
264 #ifdef LOCK_DEBUG
265
266 /*------
267  * The following configuration options are available for lock debugging:
268  *
269  *         TRACE_LOCKS          -- give a bunch of output what's going on in this file
270  *         TRACE_USERLOCKS      -- same but for user locks
271  *         TRACE_LOCK_OIDMIN-- do not trace locks for tables below this oid
272  *                                                 (use to avoid output on system tables)
273  *         TRACE_LOCK_TABLE -- trace locks on this table (oid) unconditionally
274  *         DEBUG_DEADLOCKS      -- currently dumps locks at untimely occasions ;)
275  *
276  * Furthermore, but in storage/lmgr/lwlock.c:
277  *         TRACE_LWLOCKS        -- trace lightweight locks (pretty useless)
278  *
279  * Define LOCK_DEBUG at compile time to get all these enabled.
280  * --------
281  */
282
283 int                     Trace_lock_oidmin = FirstNormalObjectId;
284 bool            Trace_locks = false;
285 bool            Trace_userlocks = false;
286 int                     Trace_lock_table = 0;
287 bool            Debug_deadlocks = false;
288
289
290 inline static bool
291 LOCK_DEBUG_ENABLED(const LOCKTAG *tag)
292 {
293         return
294                 (*(LockMethods[tag->locktag_lockmethodid]->trace_flag) &&
295                  ((Oid) tag->locktag_field2 >= (Oid) Trace_lock_oidmin))
296                 || (Trace_lock_table &&
297                         (tag->locktag_field2 == Trace_lock_table));
298 }
299
300
301 inline static void
302 LOCK_PRINT(const char *where, const LOCK *lock, LOCKMODE type)
303 {
304         if (LOCK_DEBUG_ENABLED(&lock->tag))
305                 elog(LOG,
306                          "%s: lock(%p) id(%u,%u,%u,%u,%u,%u) grantMask(%x) "
307                          "req(%d,%d,%d,%d,%d,%d,%d)=%d "
308                          "grant(%d,%d,%d,%d,%d,%d,%d)=%d wait(%d) type(%s)",
309                          where, lock,
310                          lock->tag.locktag_field1, lock->tag.locktag_field2,
311                          lock->tag.locktag_field3, lock->tag.locktag_field4,
312                          lock->tag.locktag_type, lock->tag.locktag_lockmethodid,
313                          lock->grantMask,
314                          lock->requested[1], lock->requested[2], lock->requested[3],
315                          lock->requested[4], lock->requested[5], lock->requested[6],
316                          lock->requested[7], lock->nRequested,
317                          lock->granted[1], lock->granted[2], lock->granted[3],
318                          lock->granted[4], lock->granted[5], lock->granted[6],
319                          lock->granted[7], lock->nGranted,
320                          lock->waitProcs.size,
321                          LockMethods[LOCK_LOCKMETHOD(*lock)]->lockModeNames[type]);
322 }
323
324
325 inline static void
326 PROCLOCK_PRINT(const char *where, const PROCLOCK *proclockP)
327 {
328         if (LOCK_DEBUG_ENABLED(&proclockP->tag.myLock->tag))
329                 elog(LOG,
330                          "%s: proclock(%p) lock(%p) method(%u) proc(%p) hold(%x)",
331                          where, proclockP, proclockP->tag.myLock,
332                          PROCLOCK_LOCKMETHOD(*(proclockP)),
333                          proclockP->tag.myProc, (int) proclockP->holdMask);
334 }
335 #else                                                   /* not LOCK_DEBUG */
336
337 #define LOCK_PRINT(where, lock, type)  ((void) 0)
338 #define PROCLOCK_PRINT(where, proclockP)  ((void) 0)
339 #endif                                                  /* not LOCK_DEBUG */
340
341
342 static uint32 proclock_hash(const void *key, Size keysize);
343 static void RemoveLocalLock(LOCALLOCK *locallock);
344 static PROCLOCK *SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc,
345                                                                   const LOCKTAG *locktag, uint32 hashcode, LOCKMODE lockmode);
346 static void GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner);
347 static void BeginStrongLockAcquire(LOCALLOCK *locallock, uint32 fasthashcode);
348 static void FinishStrongLockAcquire(void);
349 static void WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner);
350 static void ReleaseLockIfHeld(LOCALLOCK *locallock, bool sessionLock);
351 static void LockReassignOwner(LOCALLOCK *locallock, ResourceOwner parent);
352 static bool UnGrantLock(LOCK *lock, LOCKMODE lockmode,
353                                                 PROCLOCK *proclock, LockMethod lockMethodTable);
354 static void CleanUpLock(LOCK *lock, PROCLOCK *proclock,
355                                                 LockMethod lockMethodTable, uint32 hashcode,
356                                                 bool wakeupNeeded);
357 static void LockRefindAndRelease(LockMethod lockMethodTable, PGPROC *proc,
358                                                                  LOCKTAG *locktag, LOCKMODE lockmode,
359                                                                  bool decrement_strong_lock_count);
360 static void GetSingleProcBlockerStatusData(PGPROC *blocked_proc,
361                                                                                    BlockedProcsData *data);
362
363
364 /*
365  * InitLocks -- Initialize the lock manager's data structures.
366  *
367  * This is called from CreateSharedMemoryAndSemaphores(), which see for
368  * more comments.  In the normal postmaster case, the shared hash tables
369  * are created here, as well as a locallock hash table that will remain
370  * unused and empty in the postmaster itself.  Backends inherit the pointers
371  * to the shared tables via fork(), and also inherit an image of the locallock
372  * hash table, which they proceed to use.  In the EXEC_BACKEND case, each
373  * backend re-executes this code to obtain pointers to the already existing
374  * shared hash tables and to create its locallock hash table.
375  */
376 void
377 InitLocks(void)
378 {
379         HASHCTL         info;
380         long            init_table_size,
381                                 max_table_size;
382         bool            found;
383
384         /*
385          * Compute init/max size to request for lock hashtables.  Note these
386          * calculations must agree with LockShmemSize!
387          */
388         max_table_size = NLOCKENTS();
389         init_table_size = max_table_size / 2;
390
391         /*
392          * Allocate hash table for LOCK structs.  This stores per-locked-object
393          * information.
394          */
395         MemSet(&info, 0, sizeof(info));
396         info.keysize = sizeof(LOCKTAG);
397         info.entrysize = sizeof(LOCK);
398         info.num_partitions = NUM_LOCK_PARTITIONS;
399
400         LockMethodLockHash = ShmemInitHash("LOCK hash",
401                                                                            init_table_size,
402                                                                            max_table_size,
403                                                                            &info,
404                                                                            HASH_ELEM | HASH_BLOBS | HASH_PARTITION);
405
406         /* Assume an average of 2 holders per lock */
407         max_table_size *= 2;
408         init_table_size *= 2;
409
410         /*
411          * Allocate hash table for PROCLOCK structs.  This stores
412          * per-lock-per-holder information.
413          */
414         info.keysize = sizeof(PROCLOCKTAG);
415         info.entrysize = sizeof(PROCLOCK);
416         info.hash = proclock_hash;
417         info.num_partitions = NUM_LOCK_PARTITIONS;
418
419         LockMethodProcLockHash = ShmemInitHash("PROCLOCK hash",
420                                                                                    init_table_size,
421                                                                                    max_table_size,
422                                                                                    &info,
423                                                                                    HASH_ELEM | HASH_FUNCTION | HASH_PARTITION);
424
425         /*
426          * Allocate fast-path structures.
427          */
428         FastPathStrongRelationLocks =
429                 ShmemInitStruct("Fast Path Strong Relation Lock Data",
430                                                 sizeof(FastPathStrongRelationLockData), &found);
431         if (!found)
432                 SpinLockInit(&FastPathStrongRelationLocks->mutex);
433
434         /*
435          * Allocate non-shared hash table for LOCALLOCK structs.  This stores lock
436          * counts and resource owner information.
437          *
438          * The non-shared table could already exist in this process (this occurs
439          * when the postmaster is recreating shared memory after a backend crash).
440          * If so, delete and recreate it.  (We could simply leave it, since it
441          * ought to be empty in the postmaster, but for safety let's zap it.)
442          */
443         if (LockMethodLocalHash)
444                 hash_destroy(LockMethodLocalHash);
445
446         info.keysize = sizeof(LOCALLOCKTAG);
447         info.entrysize = sizeof(LOCALLOCK);
448
449         LockMethodLocalHash = hash_create("LOCALLOCK hash",
450                                                                           16,
451                                                                           &info,
452                                                                           HASH_ELEM | HASH_BLOBS);
453 }
454
455
456 /*
457  * Fetch the lock method table associated with a given lock
458  */
459 LockMethod
460 GetLocksMethodTable(const LOCK *lock)
461 {
462         LOCKMETHODID lockmethodid = LOCK_LOCKMETHOD(*lock);
463
464         Assert(0 < lockmethodid && lockmethodid < lengthof(LockMethods));
465         return LockMethods[lockmethodid];
466 }
467
468 /*
469  * Fetch the lock method table associated with a given locktag
470  */
471 LockMethod
472 GetLockTagsMethodTable(const LOCKTAG *locktag)
473 {
474         LOCKMETHODID lockmethodid = (LOCKMETHODID) locktag->locktag_lockmethodid;
475
476         Assert(0 < lockmethodid && lockmethodid < lengthof(LockMethods));
477         return LockMethods[lockmethodid];
478 }
479
480
481 /*
482  * Compute the hash code associated with a LOCKTAG.
483  *
484  * To avoid unnecessary recomputations of the hash code, we try to do this
485  * just once per function, and then pass it around as needed.  Aside from
486  * passing the hashcode to hash_search_with_hash_value(), we can extract
487  * the lock partition number from the hashcode.
488  */
489 uint32
490 LockTagHashCode(const LOCKTAG *locktag)
491 {
492         return get_hash_value(LockMethodLockHash, (const void *) locktag);
493 }
494
495 /*
496  * Compute the hash code associated with a PROCLOCKTAG.
497  *
498  * Because we want to use just one set of partition locks for both the
499  * LOCK and PROCLOCK hash tables, we have to make sure that PROCLOCKs
500  * fall into the same partition number as their associated LOCKs.
501  * dynahash.c expects the partition number to be the low-order bits of
502  * the hash code, and therefore a PROCLOCKTAG's hash code must have the
503  * same low-order bits as the associated LOCKTAG's hash code.  We achieve
504  * this with this specialized hash function.
505  */
506 static uint32
507 proclock_hash(const void *key, Size keysize)
508 {
509         const PROCLOCKTAG *proclocktag = (const PROCLOCKTAG *) key;
510         uint32          lockhash;
511         Datum           procptr;
512
513         Assert(keysize == sizeof(PROCLOCKTAG));
514
515         /* Look into the associated LOCK object, and compute its hash code */
516         lockhash = LockTagHashCode(&proclocktag->myLock->tag);
517
518         /*
519          * To make the hash code also depend on the PGPROC, we xor the proc
520          * struct's address into the hash code, left-shifted so that the
521          * partition-number bits don't change.  Since this is only a hash, we
522          * don't care if we lose high-order bits of the address; use an
523          * intermediate variable to suppress cast-pointer-to-int warnings.
524          */
525         procptr = PointerGetDatum(proclocktag->myProc);
526         lockhash ^= ((uint32) procptr) << LOG2_NUM_LOCK_PARTITIONS;
527
528         return lockhash;
529 }
530
531 /*
532  * Compute the hash code associated with a PROCLOCKTAG, given the hashcode
533  * for its underlying LOCK.
534  *
535  * We use this just to avoid redundant calls of LockTagHashCode().
536  */
537 static inline uint32
538 ProcLockHashCode(const PROCLOCKTAG *proclocktag, uint32 hashcode)
539 {
540         uint32          lockhash = hashcode;
541         Datum           procptr;
542
543         /*
544          * This must match proclock_hash()!
545          */
546         procptr = PointerGetDatum(proclocktag->myProc);
547         lockhash ^= ((uint32) procptr) << LOG2_NUM_LOCK_PARTITIONS;
548
549         return lockhash;
550 }
551
552 /*
553  * Given two lock modes, return whether they would conflict.
554  */
555 bool
556 DoLockModesConflict(LOCKMODE mode1, LOCKMODE mode2)
557 {
558         LockMethod      lockMethodTable = LockMethods[DEFAULT_LOCKMETHOD];
559
560         if (lockMethodTable->conflictTab[mode1] & LOCKBIT_ON(mode2))
561                 return true;
562
563         return false;
564 }
565
566 /*
567  * LockHeldByMe -- test whether lock 'locktag' is held with mode 'lockmode'
568  *              by the current transaction
569  */
570 bool
571 LockHeldByMe(const LOCKTAG *locktag, LOCKMODE lockmode)
572 {
573         LOCALLOCKTAG localtag;
574         LOCALLOCK  *locallock;
575
576         /*
577          * See if there is a LOCALLOCK entry for this lock and lockmode
578          */
579         MemSet(&localtag, 0, sizeof(localtag)); /* must clear padding */
580         localtag.lock = *locktag;
581         localtag.mode = lockmode;
582
583         locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash,
584                                                                                   (void *) &localtag,
585                                                                                   HASH_FIND, NULL);
586
587         return (locallock && locallock->nLocks > 0);
588 }
589
590 /*
591  * LockHasWaiters -- look up 'locktag' and check if releasing this
592  *              lock would wake up other processes waiting for it.
593  */
594 bool
595 LockHasWaiters(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
596 {
597         LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
598         LockMethod      lockMethodTable;
599         LOCALLOCKTAG localtag;
600         LOCALLOCK  *locallock;
601         LOCK       *lock;
602         PROCLOCK   *proclock;
603         LWLock     *partitionLock;
604         bool            hasWaiters = false;
605
606         if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
607                 elog(ERROR, "unrecognized lock method: %d", lockmethodid);
608         lockMethodTable = LockMethods[lockmethodid];
609         if (lockmode <= 0 || lockmode > lockMethodTable->numLockModes)
610                 elog(ERROR, "unrecognized lock mode: %d", lockmode);
611
612 #ifdef LOCK_DEBUG
613         if (LOCK_DEBUG_ENABLED(locktag))
614                 elog(LOG, "LockHasWaiters: lock [%u,%u] %s",
615                          locktag->locktag_field1, locktag->locktag_field2,
616                          lockMethodTable->lockModeNames[lockmode]);
617 #endif
618
619         /*
620          * Find the LOCALLOCK entry for this lock and lockmode
621          */
622         MemSet(&localtag, 0, sizeof(localtag)); /* must clear padding */
623         localtag.lock = *locktag;
624         localtag.mode = lockmode;
625
626         locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash,
627                                                                                   (void *) &localtag,
628                                                                                   HASH_FIND, NULL);
629
630         /*
631          * let the caller print its own error message, too. Do not ereport(ERROR).
632          */
633         if (!locallock || locallock->nLocks <= 0)
634         {
635                 elog(WARNING, "you don't own a lock of type %s",
636                          lockMethodTable->lockModeNames[lockmode]);
637                 return false;
638         }
639
640         /*
641          * Check the shared lock table.
642          */
643         partitionLock = LockHashPartitionLock(locallock->hashcode);
644
645         LWLockAcquire(partitionLock, LW_SHARED);
646
647         /*
648          * We don't need to re-find the lock or proclock, since we kept their
649          * addresses in the locallock table, and they couldn't have been removed
650          * while we were holding a lock on them.
651          */
652         lock = locallock->lock;
653         LOCK_PRINT("LockHasWaiters: found", lock, lockmode);
654         proclock = locallock->proclock;
655         PROCLOCK_PRINT("LockHasWaiters: found", proclock);
656
657         /*
658          * Double-check that we are actually holding a lock of the type we want to
659          * release.
660          */
661         if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
662         {
663                 PROCLOCK_PRINT("LockHasWaiters: WRONGTYPE", proclock);
664                 LWLockRelease(partitionLock);
665                 elog(WARNING, "you don't own a lock of type %s",
666                          lockMethodTable->lockModeNames[lockmode]);
667                 RemoveLocalLock(locallock);
668                 return false;
669         }
670
671         /*
672          * Do the checking.
673          */
674         if ((lockMethodTable->conflictTab[lockmode] & lock->waitMask) != 0)
675                 hasWaiters = true;
676
677         LWLockRelease(partitionLock);
678
679         return hasWaiters;
680 }
681
682 /*
683  * LockAcquire -- Check for lock conflicts, sleep if conflict found,
684  *              set lock if/when no conflicts.
685  *
686  * Inputs:
687  *      locktag: unique identifier for the lockable object
688  *      lockmode: lock mode to acquire
689  *      sessionLock: if true, acquire lock for session not current transaction
690  *      dontWait: if true, don't wait to acquire lock
691  *
692  * Returns one of:
693  *              LOCKACQUIRE_NOT_AVAIL           lock not available, and dontWait=true
694  *              LOCKACQUIRE_OK                          lock successfully acquired
695  *              LOCKACQUIRE_ALREADY_HELD        incremented count for lock already held
696  *              LOCKACQUIRE_ALREADY_CLEAR       incremented count for lock already clear
697  *
698  * In the normal case where dontWait=false and the caller doesn't need to
699  * distinguish a freshly acquired lock from one already taken earlier in
700  * this same transaction, there is no need to examine the return value.
701  *
702  * Side Effects: The lock is acquired and recorded in lock tables.
703  *
704  * NOTE: if we wait for the lock, there is no way to abort the wait
705  * short of aborting the transaction.
706  */
707 LockAcquireResult
708 LockAcquire(const LOCKTAG *locktag,
709                         LOCKMODE lockmode,
710                         bool sessionLock,
711                         bool dontWait)
712 {
713         return LockAcquireExtended(locktag, lockmode, sessionLock, dontWait,
714                                                            true, NULL);
715 }
716
717 /*
718  * LockAcquireExtended - allows us to specify additional options
719  *
720  * reportMemoryError specifies whether a lock request that fills the lock
721  * table should generate an ERROR or not.  Passing "false" allows the caller
722  * to attempt to recover from lock-table-full situations, perhaps by forcibly
723  * cancelling other lock holders and then retrying.  Note, however, that the
724  * return code for that is LOCKACQUIRE_NOT_AVAIL, so that it's unsafe to use
725  * in combination with dontWait = true, as the cause of failure couldn't be
726  * distinguished.
727  *
728  * If locallockp isn't NULL, *locallockp receives a pointer to the LOCALLOCK
729  * table entry if a lock is successfully acquired, or NULL if not.
730  */
731 LockAcquireResult
732 LockAcquireExtended(const LOCKTAG *locktag,
733                                         LOCKMODE lockmode,
734                                         bool sessionLock,
735                                         bool dontWait,
736                                         bool reportMemoryError,
737                                         LOCALLOCK **locallockp)
738 {
739         LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
740         LockMethod      lockMethodTable;
741         LOCALLOCKTAG localtag;
742         LOCALLOCK  *locallock;
743         LOCK       *lock;
744         PROCLOCK   *proclock;
745         bool            found;
746         ResourceOwner owner;
747         uint32          hashcode;
748         LWLock     *partitionLock;
749         int                     status;
750         bool            log_lock = false;
751
752         if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
753                 elog(ERROR, "unrecognized lock method: %d", lockmethodid);
754         lockMethodTable = LockMethods[lockmethodid];
755         if (lockmode <= 0 || lockmode > lockMethodTable->numLockModes)
756                 elog(ERROR, "unrecognized lock mode: %d", lockmode);
757
758         if (RecoveryInProgress() && !InRecovery &&
759                 (locktag->locktag_type == LOCKTAG_OBJECT ||
760                  locktag->locktag_type == LOCKTAG_RELATION) &&
761                 lockmode > RowExclusiveLock)
762                 ereport(ERROR,
763                                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
764                                  errmsg("cannot acquire lock mode %s on database objects while recovery is in progress",
765                                                 lockMethodTable->lockModeNames[lockmode]),
766                                  errhint("Only RowExclusiveLock or less can be acquired on database objects during recovery.")));
767
768 #ifdef LOCK_DEBUG
769         if (LOCK_DEBUG_ENABLED(locktag))
770                 elog(LOG, "LockAcquire: lock [%u,%u] %s",
771                          locktag->locktag_field1, locktag->locktag_field2,
772                          lockMethodTable->lockModeNames[lockmode]);
773 #endif
774
775         /* Identify owner for lock */
776         if (sessionLock)
777                 owner = NULL;
778         else
779                 owner = CurrentResourceOwner;
780
781         /*
782          * Find or create a LOCALLOCK entry for this lock and lockmode
783          */
784         MemSet(&localtag, 0, sizeof(localtag)); /* must clear padding */
785         localtag.lock = *locktag;
786         localtag.mode = lockmode;
787
788         locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash,
789                                                                                   (void *) &localtag,
790                                                                                   HASH_ENTER, &found);
791
792         /*
793          * if it's a new locallock object, initialize it
794          */
795         if (!found)
796         {
797                 locallock->lock = NULL;
798                 locallock->proclock = NULL;
799                 locallock->hashcode = LockTagHashCode(&(localtag.lock));
800                 locallock->nLocks = 0;
801                 locallock->holdsStrongLockCount = false;
802                 locallock->lockCleared = false;
803                 locallock->numLockOwners = 0;
804                 locallock->maxLockOwners = 8;
805                 locallock->lockOwners = NULL;   /* in case next line fails */
806                 locallock->lockOwners = (LOCALLOCKOWNER *)
807                         MemoryContextAlloc(TopMemoryContext,
808                                                            locallock->maxLockOwners * sizeof(LOCALLOCKOWNER));
809         }
810         else
811         {
812                 /* Make sure there will be room to remember the lock */
813                 if (locallock->numLockOwners >= locallock->maxLockOwners)
814                 {
815                         int                     newsize = locallock->maxLockOwners * 2;
816
817                         locallock->lockOwners = (LOCALLOCKOWNER *)
818                                 repalloc(locallock->lockOwners,
819                                                  newsize * sizeof(LOCALLOCKOWNER));
820                         locallock->maxLockOwners = newsize;
821                 }
822         }
823         hashcode = locallock->hashcode;
824
825         if (locallockp)
826                 *locallockp = locallock;
827
828         /*
829          * If we already hold the lock, we can just increase the count locally.
830          *
831          * If lockCleared is already set, caller need not worry about absorbing
832          * sinval messages related to the lock's object.
833          */
834         if (locallock->nLocks > 0)
835         {
836                 GrantLockLocal(locallock, owner);
837                 if (locallock->lockCleared)
838                         return LOCKACQUIRE_ALREADY_CLEAR;
839                 else
840                         return LOCKACQUIRE_ALREADY_HELD;
841         }
842
843         /*
844          * Prepare to emit a WAL record if acquisition of this lock needs to be
845          * replayed in a standby server.
846          *
847          * Here we prepare to log; after lock is acquired we'll issue log record.
848          * This arrangement simplifies error recovery in case the preparation step
849          * fails.
850          *
851          * Only AccessExclusiveLocks can conflict with lock types that read-only
852          * transactions can acquire in a standby server. Make sure this definition
853          * matches the one in GetRunningTransactionLocks().
854          */
855         if (lockmode >= AccessExclusiveLock &&
856                 locktag->locktag_type == LOCKTAG_RELATION &&
857                 !RecoveryInProgress() &&
858                 XLogStandbyInfoActive())
859         {
860                 LogAccessExclusiveLockPrepare();
861                 log_lock = true;
862         }
863
864         /*
865          * Attempt to take lock via fast path, if eligible.  But if we remember
866          * having filled up the fast path array, we don't attempt to make any
867          * further use of it until we release some locks.  It's possible that some
868          * other backend has transferred some of those locks to the shared hash
869          * table, leaving space free, but it's not worth acquiring the LWLock just
870          * to check.  It's also possible that we're acquiring a second or third
871          * lock type on a relation we have already locked using the fast-path, but
872          * for now we don't worry about that case either.
873          */
874         if (EligibleForRelationFastPath(locktag, lockmode) &&
875                 FastPathLocalUseCount < FP_LOCK_SLOTS_PER_BACKEND)
876         {
877                 uint32          fasthashcode = FastPathStrongLockHashPartition(hashcode);
878                 bool            acquired;
879
880                 /*
881                  * LWLockAcquire acts as a memory sequencing point, so it's safe to
882                  * assume that any strong locker whose increment to
883                  * FastPathStrongRelationLocks->counts becomes visible after we test
884                  * it has yet to begin to transfer fast-path locks.
885                  */
886                 LWLockAcquire(&MyProc->backendLock, LW_EXCLUSIVE);
887                 if (FastPathStrongRelationLocks->count[fasthashcode] != 0)
888                         acquired = false;
889                 else
890                         acquired = FastPathGrantRelationLock(locktag->locktag_field2,
891                                                                                                  lockmode);
892                 LWLockRelease(&MyProc->backendLock);
893                 if (acquired)
894                 {
895                         /*
896                          * The locallock might contain stale pointers to some old shared
897                          * objects; we MUST reset these to null before considering the
898                          * lock to be acquired via fast-path.
899                          */
900                         locallock->lock = NULL;
901                         locallock->proclock = NULL;
902                         GrantLockLocal(locallock, owner);
903                         return LOCKACQUIRE_OK;
904                 }
905         }
906
907         /*
908          * If this lock could potentially have been taken via the fast-path by
909          * some other backend, we must (temporarily) disable further use of the
910          * fast-path for this lock tag, and migrate any locks already taken via
911          * this method to the main lock table.
912          */
913         if (ConflictsWithRelationFastPath(locktag, lockmode))
914         {
915                 uint32          fasthashcode = FastPathStrongLockHashPartition(hashcode);
916
917                 BeginStrongLockAcquire(locallock, fasthashcode);
918                 if (!FastPathTransferRelationLocks(lockMethodTable, locktag,
919                                                                                    hashcode))
920                 {
921                         AbortStrongLockAcquire();
922                         if (locallock->nLocks == 0)
923                                 RemoveLocalLock(locallock);
924                         if (locallockp)
925                                 *locallockp = NULL;
926                         if (reportMemoryError)
927                                 ereport(ERROR,
928                                                 (errcode(ERRCODE_OUT_OF_MEMORY),
929                                                  errmsg("out of shared memory"),
930                                                  errhint("You might need to increase max_locks_per_transaction.")));
931                         else
932                                 return LOCKACQUIRE_NOT_AVAIL;
933                 }
934         }
935
936         /*
937          * We didn't find the lock in our LOCALLOCK table, and we didn't manage to
938          * take it via the fast-path, either, so we've got to mess with the shared
939          * lock table.
940          */
941         partitionLock = LockHashPartitionLock(hashcode);
942
943         LWLockAcquire(partitionLock, LW_EXCLUSIVE);
944
945         /*
946          * Find or create lock and proclock entries with this tag
947          *
948          * Note: if the locallock object already existed, it might have a pointer
949          * to the lock already ... but we should not assume that that pointer is
950          * valid, since a lock object with zero hold and request counts can go
951          * away anytime.  So we have to use SetupLockInTable() to recompute the
952          * lock and proclock pointers, even if they're already set.
953          */
954         proclock = SetupLockInTable(lockMethodTable, MyProc, locktag,
955                                                                 hashcode, lockmode);
956         if (!proclock)
957         {
958                 AbortStrongLockAcquire();
959                 LWLockRelease(partitionLock);
960                 if (locallock->nLocks == 0)
961                         RemoveLocalLock(locallock);
962                 if (locallockp)
963                         *locallockp = NULL;
964                 if (reportMemoryError)
965                         ereport(ERROR,
966                                         (errcode(ERRCODE_OUT_OF_MEMORY),
967                                          errmsg("out of shared memory"),
968                                          errhint("You might need to increase max_locks_per_transaction.")));
969                 else
970                         return LOCKACQUIRE_NOT_AVAIL;
971         }
972         locallock->proclock = proclock;
973         lock = proclock->tag.myLock;
974         locallock->lock = lock;
975
976         /*
977          * If lock requested conflicts with locks requested by waiters, must join
978          * wait queue.  Otherwise, check for conflict with already-held locks.
979          * (That's last because most complex check.)
980          */
981         if (lockMethodTable->conflictTab[lockmode] & lock->waitMask)
982                 status = STATUS_FOUND;
983         else
984                 status = LockCheckConflicts(lockMethodTable, lockmode,
985                                                                         lock, proclock);
986
987         if (status == STATUS_OK)
988         {
989                 /* No conflict with held or previously requested locks */
990                 GrantLock(lock, proclock, lockmode);
991                 GrantLockLocal(locallock, owner);
992         }
993         else
994         {
995                 Assert(status == STATUS_FOUND);
996
997                 /*
998                  * We can't acquire the lock immediately.  If caller specified no
999                  * blocking, remove useless table entries and return
1000                  * LOCKACQUIRE_NOT_AVAIL without waiting.
1001                  */
1002                 if (dontWait)
1003                 {
1004                         AbortStrongLockAcquire();
1005                         if (proclock->holdMask == 0)
1006                         {
1007                                 uint32          proclock_hashcode;
1008
1009                                 proclock_hashcode = ProcLockHashCode(&proclock->tag, hashcode);
1010                                 SHMQueueDelete(&proclock->lockLink);
1011                                 SHMQueueDelete(&proclock->procLink);
1012                                 if (!hash_search_with_hash_value(LockMethodProcLockHash,
1013                                                                                                  (void *) &(proclock->tag),
1014                                                                                                  proclock_hashcode,
1015                                                                                                  HASH_REMOVE,
1016                                                                                                  NULL))
1017                                         elog(PANIC, "proclock table corrupted");
1018                         }
1019                         else
1020                                 PROCLOCK_PRINT("LockAcquire: NOWAIT", proclock);
1021                         lock->nRequested--;
1022                         lock->requested[lockmode]--;
1023                         LOCK_PRINT("LockAcquire: conditional lock failed", lock, lockmode);
1024                         Assert((lock->nRequested > 0) && (lock->requested[lockmode] >= 0));
1025                         Assert(lock->nGranted <= lock->nRequested);
1026                         LWLockRelease(partitionLock);
1027                         if (locallock->nLocks == 0)
1028                                 RemoveLocalLock(locallock);
1029                         if (locallockp)
1030                                 *locallockp = NULL;
1031                         return LOCKACQUIRE_NOT_AVAIL;
1032                 }
1033
1034                 /*
1035                  * Set bitmask of locks this process already holds on this object.
1036                  */
1037                 MyProc->heldLocks = proclock->holdMask;
1038
1039                 /*
1040                  * Sleep till someone wakes me up.
1041                  */
1042
1043                 TRACE_POSTGRESQL_LOCK_WAIT_START(locktag->locktag_field1,
1044                                                                                  locktag->locktag_field2,
1045                                                                                  locktag->locktag_field3,
1046                                                                                  locktag->locktag_field4,
1047                                                                                  locktag->locktag_type,
1048                                                                                  lockmode);
1049
1050                 WaitOnLock(locallock, owner);
1051
1052                 TRACE_POSTGRESQL_LOCK_WAIT_DONE(locktag->locktag_field1,
1053                                                                                 locktag->locktag_field2,
1054                                                                                 locktag->locktag_field3,
1055                                                                                 locktag->locktag_field4,
1056                                                                                 locktag->locktag_type,
1057                                                                                 lockmode);
1058
1059                 /*
1060                  * NOTE: do not do any material change of state between here and
1061                  * return.  All required changes in locktable state must have been
1062                  * done when the lock was granted to us --- see notes in WaitOnLock.
1063                  */
1064
1065                 /*
1066                  * Check the proclock entry status, in case something in the ipc
1067                  * communication doesn't work correctly.
1068                  */
1069                 if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
1070                 {
1071                         AbortStrongLockAcquire();
1072                         PROCLOCK_PRINT("LockAcquire: INCONSISTENT", proclock);
1073                         LOCK_PRINT("LockAcquire: INCONSISTENT", lock, lockmode);
1074                         /* Should we retry ? */
1075                         LWLockRelease(partitionLock);
1076                         elog(ERROR, "LockAcquire failed");
1077                 }
1078                 PROCLOCK_PRINT("LockAcquire: granted", proclock);
1079                 LOCK_PRINT("LockAcquire: granted", lock, lockmode);
1080         }
1081
1082         /*
1083          * Lock state is fully up-to-date now; if we error out after this, no
1084          * special error cleanup is required.
1085          */
1086         FinishStrongLockAcquire();
1087
1088         LWLockRelease(partitionLock);
1089
1090         /*
1091          * Emit a WAL record if acquisition of this lock needs to be replayed in a
1092          * standby server.
1093          */
1094         if (log_lock)
1095         {
1096                 /*
1097                  * Decode the locktag back to the original values, to avoid sending
1098                  * lots of empty bytes with every message.  See lock.h to check how a
1099                  * locktag is defined for LOCKTAG_RELATION
1100                  */
1101                 LogAccessExclusiveLock(locktag->locktag_field1,
1102                                                            locktag->locktag_field2);
1103         }
1104
1105         return LOCKACQUIRE_OK;
1106 }
1107
1108 /*
1109  * Find or create LOCK and PROCLOCK objects as needed for a new lock
1110  * request.
1111  *
1112  * Returns the PROCLOCK object, or NULL if we failed to create the objects
1113  * for lack of shared memory.
1114  *
1115  * The appropriate partition lock must be held at entry, and will be
1116  * held at exit.
1117  */
1118 static PROCLOCK *
1119 SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc,
1120                                  const LOCKTAG *locktag, uint32 hashcode, LOCKMODE lockmode)
1121 {
1122         LOCK       *lock;
1123         PROCLOCK   *proclock;
1124         PROCLOCKTAG proclocktag;
1125         uint32          proclock_hashcode;
1126         bool            found;
1127
1128         /*
1129          * Find or create a lock with this tag.
1130          */
1131         lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
1132                                                                                                 (const void *) locktag,
1133                                                                                                 hashcode,
1134                                                                                                 HASH_ENTER_NULL,
1135                                                                                                 &found);
1136         if (!lock)
1137                 return NULL;
1138
1139         /*
1140          * if it's a new lock object, initialize it
1141          */
1142         if (!found)
1143         {
1144                 lock->grantMask = 0;
1145                 lock->waitMask = 0;
1146                 SHMQueueInit(&(lock->procLocks));
1147                 ProcQueueInit(&(lock->waitProcs));
1148                 lock->nRequested = 0;
1149                 lock->nGranted = 0;
1150                 MemSet(lock->requested, 0, sizeof(int) * MAX_LOCKMODES);
1151                 MemSet(lock->granted, 0, sizeof(int) * MAX_LOCKMODES);
1152                 LOCK_PRINT("LockAcquire: new", lock, lockmode);
1153         }
1154         else
1155         {
1156                 LOCK_PRINT("LockAcquire: found", lock, lockmode);
1157                 Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
1158                 Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
1159                 Assert(lock->nGranted <= lock->nRequested);
1160         }
1161
1162         /*
1163          * Create the hash key for the proclock table.
1164          */
1165         proclocktag.myLock = lock;
1166         proclocktag.myProc = proc;
1167
1168         proclock_hashcode = ProcLockHashCode(&proclocktag, hashcode);
1169
1170         /*
1171          * Find or create a proclock entry with this tag
1172          */
1173         proclock = (PROCLOCK *) hash_search_with_hash_value(LockMethodProcLockHash,
1174                                                                                                                 (void *) &proclocktag,
1175                                                                                                                 proclock_hashcode,
1176                                                                                                                 HASH_ENTER_NULL,
1177                                                                                                                 &found);
1178         if (!proclock)
1179         {
1180                 /* Oops, not enough shmem for the proclock */
1181                 if (lock->nRequested == 0)
1182                 {
1183                         /*
1184                          * There are no other requestors of this lock, so garbage-collect
1185                          * the lock object.  We *must* do this to avoid a permanent leak
1186                          * of shared memory, because there won't be anything to cause
1187                          * anyone to release the lock object later.
1188                          */
1189                         Assert(SHMQueueEmpty(&(lock->procLocks)));
1190                         if (!hash_search_with_hash_value(LockMethodLockHash,
1191                                                                                          (void *) &(lock->tag),
1192                                                                                          hashcode,
1193                                                                                          HASH_REMOVE,
1194                                                                                          NULL))
1195                                 elog(PANIC, "lock table corrupted");
1196                 }
1197                 return NULL;
1198         }
1199
1200         /*
1201          * If new, initialize the new entry
1202          */
1203         if (!found)
1204         {
1205                 uint32          partition = LockHashPartition(hashcode);
1206
1207                 /*
1208                  * It might seem unsafe to access proclock->groupLeader without a
1209                  * lock, but it's not really.  Either we are initializing a proclock
1210                  * on our own behalf, in which case our group leader isn't changing
1211                  * because the group leader for a process can only ever be changed by
1212                  * the process itself; or else we are transferring a fast-path lock to
1213                  * the main lock table, in which case that process can't change it's
1214                  * lock group leader without first releasing all of its locks (and in
1215                  * particular the one we are currently transferring).
1216                  */
1217                 proclock->groupLeader = proc->lockGroupLeader != NULL ?
1218                         proc->lockGroupLeader : proc;
1219                 proclock->holdMask = 0;
1220                 proclock->releaseMask = 0;
1221                 /* Add proclock to appropriate lists */
1222                 SHMQueueInsertBefore(&lock->procLocks, &proclock->lockLink);
1223                 SHMQueueInsertBefore(&(proc->myProcLocks[partition]),
1224                                                          &proclock->procLink);
1225                 PROCLOCK_PRINT("LockAcquire: new", proclock);
1226         }
1227         else
1228         {
1229                 PROCLOCK_PRINT("LockAcquire: found", proclock);
1230                 Assert((proclock->holdMask & ~lock->grantMask) == 0);
1231
1232 #ifdef CHECK_DEADLOCK_RISK
1233
1234                 /*
1235                  * Issue warning if we already hold a lower-level lock on this object
1236                  * and do not hold a lock of the requested level or higher. This
1237                  * indicates a deadlock-prone coding practice (eg, we'd have a
1238                  * deadlock if another backend were following the same code path at
1239                  * about the same time).
1240                  *
1241                  * This is not enabled by default, because it may generate log entries
1242                  * about user-level coding practices that are in fact safe in context.
1243                  * It can be enabled to help find system-level problems.
1244                  *
1245                  * XXX Doing numeric comparison on the lockmodes is a hack; it'd be
1246                  * better to use a table.  For now, though, this works.
1247                  */
1248                 {
1249                         int                     i;
1250
1251                         for (i = lockMethodTable->numLockModes; i > 0; i--)
1252                         {
1253                                 if (proclock->holdMask & LOCKBIT_ON(i))
1254                                 {
1255                                         if (i >= (int) lockmode)
1256                                                 break;  /* safe: we have a lock >= req level */
1257                                         elog(LOG, "deadlock risk: raising lock level"
1258                                                  " from %s to %s on object %u/%u/%u",
1259                                                  lockMethodTable->lockModeNames[i],
1260                                                  lockMethodTable->lockModeNames[lockmode],
1261                                                  lock->tag.locktag_field1, lock->tag.locktag_field2,
1262                                                  lock->tag.locktag_field3);
1263                                         break;
1264                                 }
1265                         }
1266                 }
1267 #endif                                                  /* CHECK_DEADLOCK_RISK */
1268         }
1269
1270         /*
1271          * lock->nRequested and lock->requested[] count the total number of
1272          * requests, whether granted or waiting, so increment those immediately.
1273          * The other counts don't increment till we get the lock.
1274          */
1275         lock->nRequested++;
1276         lock->requested[lockmode]++;
1277         Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
1278
1279         /*
1280          * We shouldn't already hold the desired lock; else locallock table is
1281          * broken.
1282          */
1283         if (proclock->holdMask & LOCKBIT_ON(lockmode))
1284                 elog(ERROR, "lock %s on object %u/%u/%u is already held",
1285                          lockMethodTable->lockModeNames[lockmode],
1286                          lock->tag.locktag_field1, lock->tag.locktag_field2,
1287                          lock->tag.locktag_field3);
1288
1289         return proclock;
1290 }
1291
1292 /*
1293  * Subroutine to free a locallock entry
1294  */
1295 static void
1296 RemoveLocalLock(LOCALLOCK *locallock)
1297 {
1298         int                     i;
1299
1300         for (i = locallock->numLockOwners - 1; i >= 0; i--)
1301         {
1302                 if (locallock->lockOwners[i].owner != NULL)
1303                         ResourceOwnerForgetLock(locallock->lockOwners[i].owner, locallock);
1304         }
1305         locallock->numLockOwners = 0;
1306         if (locallock->lockOwners != NULL)
1307                 pfree(locallock->lockOwners);
1308         locallock->lockOwners = NULL;
1309
1310         if (locallock->holdsStrongLockCount)
1311         {
1312                 uint32          fasthashcode;
1313
1314                 fasthashcode = FastPathStrongLockHashPartition(locallock->hashcode);
1315
1316                 SpinLockAcquire(&FastPathStrongRelationLocks->mutex);
1317                 Assert(FastPathStrongRelationLocks->count[fasthashcode] > 0);
1318                 FastPathStrongRelationLocks->count[fasthashcode]--;
1319                 locallock->holdsStrongLockCount = false;
1320                 SpinLockRelease(&FastPathStrongRelationLocks->mutex);
1321         }
1322
1323         if (!hash_search(LockMethodLocalHash,
1324                                          (void *) &(locallock->tag),
1325                                          HASH_REMOVE, NULL))
1326                 elog(WARNING, "locallock table corrupted");
1327 }
1328
1329 /*
1330  * LockCheckConflicts -- test whether requested lock conflicts
1331  *              with those already granted
1332  *
1333  * Returns STATUS_FOUND if conflict, STATUS_OK if no conflict.
1334  *
1335  * NOTES:
1336  *              Here's what makes this complicated: one process's locks don't
1337  * conflict with one another, no matter what purpose they are held for
1338  * (eg, session and transaction locks do not conflict).  Nor do the locks
1339  * of one process in a lock group conflict with those of another process in
1340  * the same group.  So, we must subtract off these locks when determining
1341  * whether the requested new lock conflicts with those already held.
1342  */
1343 int
1344 LockCheckConflicts(LockMethod lockMethodTable,
1345                                    LOCKMODE lockmode,
1346                                    LOCK *lock,
1347                                    PROCLOCK *proclock)
1348 {
1349         int                     numLockModes = lockMethodTable->numLockModes;
1350         LOCKMASK        myLocks;
1351         int                     conflictMask = lockMethodTable->conflictTab[lockmode];
1352         int                     conflictsRemaining[MAX_LOCKMODES];
1353         int                     totalConflictsRemaining = 0;
1354         int                     i;
1355         SHM_QUEUE  *procLocks;
1356         PROCLOCK   *otherproclock;
1357
1358         /*
1359          * first check for global conflicts: If no locks conflict with my request,
1360          * then I get the lock.
1361          *
1362          * Checking for conflict: lock->grantMask represents the types of
1363          * currently held locks.  conflictTable[lockmode] has a bit set for each
1364          * type of lock that conflicts with request.   Bitwise compare tells if
1365          * there is a conflict.
1366          */
1367         if (!(conflictMask & lock->grantMask))
1368         {
1369                 PROCLOCK_PRINT("LockCheckConflicts: no conflict", proclock);
1370                 return STATUS_OK;
1371         }
1372
1373         /*
1374          * Rats.  Something conflicts.  But it could still be my own lock, or a
1375          * lock held by another member of my locking group.  First, figure out how
1376          * many conflicts remain after subtracting out any locks I hold myself.
1377          */
1378         myLocks = proclock->holdMask;
1379         for (i = 1; i <= numLockModes; i++)
1380         {
1381                 if ((conflictMask & LOCKBIT_ON(i)) == 0)
1382                 {
1383                         conflictsRemaining[i] = 0;
1384                         continue;
1385                 }
1386                 conflictsRemaining[i] = lock->granted[i];
1387                 if (myLocks & LOCKBIT_ON(i))
1388                         --conflictsRemaining[i];
1389                 totalConflictsRemaining += conflictsRemaining[i];
1390         }
1391
1392         /* If no conflicts remain, we get the lock. */
1393         if (totalConflictsRemaining == 0)
1394         {
1395                 PROCLOCK_PRINT("LockCheckConflicts: resolved (simple)", proclock);
1396                 return STATUS_OK;
1397         }
1398
1399         /* If no group locking, it's definitely a conflict. */
1400         if (proclock->groupLeader == MyProc && MyProc->lockGroupLeader == NULL)
1401         {
1402                 Assert(proclock->tag.myProc == MyProc);
1403                 PROCLOCK_PRINT("LockCheckConflicts: conflicting (simple)",
1404                                            proclock);
1405                 return STATUS_FOUND;
1406         }
1407
1408         /*
1409          * Locks held in conflicting modes by members of our own lock group are
1410          * not real conflicts; we can subtract those out and see if we still have
1411          * a conflict.  This is O(N) in the number of processes holding or
1412          * awaiting locks on this object.  We could improve that by making the
1413          * shared memory state more complex (and larger) but it doesn't seem worth
1414          * it.
1415          */
1416         procLocks = &(lock->procLocks);
1417         otherproclock = (PROCLOCK *)
1418                 SHMQueueNext(procLocks, procLocks, offsetof(PROCLOCK, lockLink));
1419         while (otherproclock != NULL)
1420         {
1421                 if (proclock != otherproclock &&
1422                         proclock->groupLeader == otherproclock->groupLeader &&
1423                         (otherproclock->holdMask & conflictMask) != 0)
1424                 {
1425                         int                     intersectMask = otherproclock->holdMask & conflictMask;
1426
1427                         for (i = 1; i <= numLockModes; i++)
1428                         {
1429                                 if ((intersectMask & LOCKBIT_ON(i)) != 0)
1430                                 {
1431                                         if (conflictsRemaining[i] <= 0)
1432                                                 elog(PANIC, "proclocks held do not match lock");
1433                                         conflictsRemaining[i]--;
1434                                         totalConflictsRemaining--;
1435                                 }
1436                         }
1437
1438                         if (totalConflictsRemaining == 0)
1439                         {
1440                                 PROCLOCK_PRINT("LockCheckConflicts: resolved (group)",
1441                                                            proclock);
1442                                 return STATUS_OK;
1443                         }
1444                 }
1445                 otherproclock = (PROCLOCK *)
1446                         SHMQueueNext(procLocks, &otherproclock->lockLink,
1447                                                  offsetof(PROCLOCK, lockLink));
1448         }
1449
1450         /* Nope, it's a real conflict. */
1451         PROCLOCK_PRINT("LockCheckConflicts: conflicting (group)", proclock);
1452         return STATUS_FOUND;
1453 }
1454
1455 /*
1456  * GrantLock -- update the lock and proclock data structures to show
1457  *              the lock request has been granted.
1458  *
1459  * NOTE: if proc was blocked, it also needs to be removed from the wait list
1460  * and have its waitLock/waitProcLock fields cleared.  That's not done here.
1461  *
1462  * NOTE: the lock grant also has to be recorded in the associated LOCALLOCK
1463  * table entry; but since we may be awaking some other process, we can't do
1464  * that here; it's done by GrantLockLocal, instead.
1465  */
1466 void
1467 GrantLock(LOCK *lock, PROCLOCK *proclock, LOCKMODE lockmode)
1468 {
1469         lock->nGranted++;
1470         lock->granted[lockmode]++;
1471         lock->grantMask |= LOCKBIT_ON(lockmode);
1472         if (lock->granted[lockmode] == lock->requested[lockmode])
1473                 lock->waitMask &= LOCKBIT_OFF(lockmode);
1474         proclock->holdMask |= LOCKBIT_ON(lockmode);
1475         LOCK_PRINT("GrantLock", lock, lockmode);
1476         Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
1477         Assert(lock->nGranted <= lock->nRequested);
1478 }
1479
1480 /*
1481  * UnGrantLock -- opposite of GrantLock.
1482  *
1483  * Updates the lock and proclock data structures to show that the lock
1484  * is no longer held nor requested by the current holder.
1485  *
1486  * Returns true if there were any waiters waiting on the lock that
1487  * should now be woken up with ProcLockWakeup.
1488  */
1489 static bool
1490 UnGrantLock(LOCK *lock, LOCKMODE lockmode,
1491                         PROCLOCK *proclock, LockMethod lockMethodTable)
1492 {
1493         bool            wakeupNeeded = false;
1494
1495         Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
1496         Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
1497         Assert(lock->nGranted <= lock->nRequested);
1498
1499         /*
1500          * fix the general lock stats
1501          */
1502         lock->nRequested--;
1503         lock->requested[lockmode]--;
1504         lock->nGranted--;
1505         lock->granted[lockmode]--;
1506
1507         if (lock->granted[lockmode] == 0)
1508         {
1509                 /* change the conflict mask.  No more of this lock type. */
1510                 lock->grantMask &= LOCKBIT_OFF(lockmode);
1511         }
1512
1513         LOCK_PRINT("UnGrantLock: updated", lock, lockmode);
1514
1515         /*
1516          * We need only run ProcLockWakeup if the released lock conflicts with at
1517          * least one of the lock types requested by waiter(s).  Otherwise whatever
1518          * conflict made them wait must still exist.  NOTE: before MVCC, we could
1519          * skip wakeup if lock->granted[lockmode] was still positive. But that's
1520          * not true anymore, because the remaining granted locks might belong to
1521          * some waiter, who could now be awakened because he doesn't conflict with
1522          * his own locks.
1523          */
1524         if (lockMethodTable->conflictTab[lockmode] & lock->waitMask)
1525                 wakeupNeeded = true;
1526
1527         /*
1528          * Now fix the per-proclock state.
1529          */
1530         proclock->holdMask &= LOCKBIT_OFF(lockmode);
1531         PROCLOCK_PRINT("UnGrantLock: updated", proclock);
1532
1533         return wakeupNeeded;
1534 }
1535
1536 /*
1537  * CleanUpLock -- clean up after releasing a lock.  We garbage-collect the
1538  * proclock and lock objects if possible, and call ProcLockWakeup if there
1539  * are remaining requests and the caller says it's OK.  (Normally, this
1540  * should be called after UnGrantLock, and wakeupNeeded is the result from
1541  * UnGrantLock.)
1542  *
1543  * The appropriate partition lock must be held at entry, and will be
1544  * held at exit.
1545  */
1546 static void
1547 CleanUpLock(LOCK *lock, PROCLOCK *proclock,
1548                         LockMethod lockMethodTable, uint32 hashcode,
1549                         bool wakeupNeeded)
1550 {
1551         /*
1552          * If this was my last hold on this lock, delete my entry in the proclock
1553          * table.
1554          */
1555         if (proclock->holdMask == 0)
1556         {
1557                 uint32          proclock_hashcode;
1558
1559                 PROCLOCK_PRINT("CleanUpLock: deleting", proclock);
1560                 SHMQueueDelete(&proclock->lockLink);
1561                 SHMQueueDelete(&proclock->procLink);
1562                 proclock_hashcode = ProcLockHashCode(&proclock->tag, hashcode);
1563                 if (!hash_search_with_hash_value(LockMethodProcLockHash,
1564                                                                                  (void *) &(proclock->tag),
1565                                                                                  proclock_hashcode,
1566                                                                                  HASH_REMOVE,
1567                                                                                  NULL))
1568                         elog(PANIC, "proclock table corrupted");
1569         }
1570
1571         if (lock->nRequested == 0)
1572         {
1573                 /*
1574                  * The caller just released the last lock, so garbage-collect the lock
1575                  * object.
1576                  */
1577                 LOCK_PRINT("CleanUpLock: deleting", lock, 0);
1578                 Assert(SHMQueueEmpty(&(lock->procLocks)));
1579                 if (!hash_search_with_hash_value(LockMethodLockHash,
1580                                                                                  (void *) &(lock->tag),
1581                                                                                  hashcode,
1582                                                                                  HASH_REMOVE,
1583                                                                                  NULL))
1584                         elog(PANIC, "lock table corrupted");
1585         }
1586         else if (wakeupNeeded)
1587         {
1588                 /* There are waiters on this lock, so wake them up. */
1589                 ProcLockWakeup(lockMethodTable, lock);
1590         }
1591 }
1592
1593 /*
1594  * GrantLockLocal -- update the locallock data structures to show
1595  *              the lock request has been granted.
1596  *
1597  * We expect that LockAcquire made sure there is room to add a new
1598  * ResourceOwner entry.
1599  */
1600 static void
1601 GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner)
1602 {
1603         LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
1604         int                     i;
1605
1606         Assert(locallock->numLockOwners < locallock->maxLockOwners);
1607         /* Count the total */
1608         locallock->nLocks++;
1609         /* Count the per-owner lock */
1610         for (i = 0; i < locallock->numLockOwners; i++)
1611         {
1612                 if (lockOwners[i].owner == owner)
1613                 {
1614                         lockOwners[i].nLocks++;
1615                         return;
1616                 }
1617         }
1618         lockOwners[i].owner = owner;
1619         lockOwners[i].nLocks = 1;
1620         locallock->numLockOwners++;
1621         if (owner != NULL)
1622                 ResourceOwnerRememberLock(owner, locallock);
1623 }
1624
1625 /*
1626  * BeginStrongLockAcquire - inhibit use of fastpath for a given LOCALLOCK,
1627  * and arrange for error cleanup if it fails
1628  */
1629 static void
1630 BeginStrongLockAcquire(LOCALLOCK *locallock, uint32 fasthashcode)
1631 {
1632         Assert(StrongLockInProgress == NULL);
1633         Assert(locallock->holdsStrongLockCount == false);
1634
1635         /*
1636          * Adding to a memory location is not atomic, so we take a spinlock to
1637          * ensure we don't collide with someone else trying to bump the count at
1638          * the same time.
1639          *
1640          * XXX: It might be worth considering using an atomic fetch-and-add
1641          * instruction here, on architectures where that is supported.
1642          */
1643
1644         SpinLockAcquire(&FastPathStrongRelationLocks->mutex);
1645         FastPathStrongRelationLocks->count[fasthashcode]++;
1646         locallock->holdsStrongLockCount = true;
1647         StrongLockInProgress = locallock;
1648         SpinLockRelease(&FastPathStrongRelationLocks->mutex);
1649 }
1650
1651 /*
1652  * FinishStrongLockAcquire - cancel pending cleanup for a strong lock
1653  * acquisition once it's no longer needed
1654  */
1655 static void
1656 FinishStrongLockAcquire(void)
1657 {
1658         StrongLockInProgress = NULL;
1659 }
1660
1661 /*
1662  * AbortStrongLockAcquire - undo strong lock state changes performed by
1663  * BeginStrongLockAcquire.
1664  */
1665 void
1666 AbortStrongLockAcquire(void)
1667 {
1668         uint32          fasthashcode;
1669         LOCALLOCK  *locallock = StrongLockInProgress;
1670
1671         if (locallock == NULL)
1672                 return;
1673
1674         fasthashcode = FastPathStrongLockHashPartition(locallock->hashcode);
1675         Assert(locallock->holdsStrongLockCount == true);
1676         SpinLockAcquire(&FastPathStrongRelationLocks->mutex);
1677         Assert(FastPathStrongRelationLocks->count[fasthashcode] > 0);
1678         FastPathStrongRelationLocks->count[fasthashcode]--;
1679         locallock->holdsStrongLockCount = false;
1680         StrongLockInProgress = NULL;
1681         SpinLockRelease(&FastPathStrongRelationLocks->mutex);
1682 }
1683
1684 /*
1685  * GrantAwaitedLock -- call GrantLockLocal for the lock we are doing
1686  *              WaitOnLock on.
1687  *
1688  * proc.c needs this for the case where we are booted off the lock by
1689  * timeout, but discover that someone granted us the lock anyway.
1690  *
1691  * We could just export GrantLockLocal, but that would require including
1692  * resowner.h in lock.h, which creates circularity.
1693  */
1694 void
1695 GrantAwaitedLock(void)
1696 {
1697         GrantLockLocal(awaitedLock, awaitedOwner);
1698 }
1699
1700 /*
1701  * MarkLockClear -- mark an acquired lock as "clear"
1702  *
1703  * This means that we know we have absorbed all sinval messages that other
1704  * sessions generated before we acquired this lock, and so we can confidently
1705  * assume we know about any catalog changes protected by this lock.
1706  */
1707 void
1708 MarkLockClear(LOCALLOCK *locallock)
1709 {
1710         Assert(locallock->nLocks > 0);
1711         locallock->lockCleared = true;
1712 }
1713
1714 /*
1715  * WaitOnLock -- wait to acquire a lock
1716  *
1717  * Caller must have set MyProc->heldLocks to reflect locks already held
1718  * on the lockable object by this process.
1719  *
1720  * The appropriate partition lock must be held at entry.
1721  */
1722 static void
1723 WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner)
1724 {
1725         LOCKMETHODID lockmethodid = LOCALLOCK_LOCKMETHOD(*locallock);
1726         LockMethod      lockMethodTable = LockMethods[lockmethodid];
1727         char       *volatile new_status = NULL;
1728
1729         LOCK_PRINT("WaitOnLock: sleeping on lock",
1730                            locallock->lock, locallock->tag.mode);
1731
1732         /* Report change to waiting status */
1733         if (update_process_title)
1734         {
1735                 const char *old_status;
1736                 int                     len;
1737
1738                 old_status = get_ps_display(&len);
1739                 new_status = (char *) palloc(len + 8 + 1);
1740                 memcpy(new_status, old_status, len);
1741                 strcpy(new_status + len, " waiting");
1742                 set_ps_display(new_status, false);
1743                 new_status[len] = '\0'; /* truncate off " waiting" */
1744         }
1745
1746         awaitedLock = locallock;
1747         awaitedOwner = owner;
1748
1749         /*
1750          * NOTE: Think not to put any shared-state cleanup after the call to
1751          * ProcSleep, in either the normal or failure path.  The lock state must
1752          * be fully set by the lock grantor, or by CheckDeadLock if we give up
1753          * waiting for the lock.  This is necessary because of the possibility
1754          * that a cancel/die interrupt will interrupt ProcSleep after someone else
1755          * grants us the lock, but before we've noticed it. Hence, after granting,
1756          * the locktable state must fully reflect the fact that we own the lock;
1757          * we can't do additional work on return.
1758          *
1759          * We can and do use a PG_TRY block to try to clean up after failure, but
1760          * this still has a major limitation: elog(FATAL) can occur while waiting
1761          * (eg, a "die" interrupt), and then control won't come back here. So all
1762          * cleanup of essential state should happen in LockErrorCleanup, not here.
1763          * We can use PG_TRY to clear the "waiting" status flags, since doing that
1764          * is unimportant if the process exits.
1765          */
1766         PG_TRY();
1767         {
1768                 if (ProcSleep(locallock, lockMethodTable) != STATUS_OK)
1769                 {
1770                         /*
1771                          * We failed as a result of a deadlock, see CheckDeadLock(). Quit
1772                          * now.
1773                          */
1774                         awaitedLock = NULL;
1775                         LOCK_PRINT("WaitOnLock: aborting on lock",
1776                                            locallock->lock, locallock->tag.mode);
1777                         LWLockRelease(LockHashPartitionLock(locallock->hashcode));
1778
1779                         /*
1780                          * Now that we aren't holding the partition lock, we can give an
1781                          * error report including details about the detected deadlock.
1782                          */
1783                         DeadLockReport();
1784                         /* not reached */
1785                 }
1786         }
1787         PG_CATCH();
1788         {
1789                 /* In this path, awaitedLock remains set until LockErrorCleanup */
1790
1791                 /* Report change to non-waiting status */
1792                 if (update_process_title)
1793                 {
1794                         set_ps_display(new_status, false);
1795                         pfree(new_status);
1796                 }
1797
1798                 /* and propagate the error */
1799                 PG_RE_THROW();
1800         }
1801         PG_END_TRY();
1802
1803         awaitedLock = NULL;
1804
1805         /* Report change to non-waiting status */
1806         if (update_process_title)
1807         {
1808                 set_ps_display(new_status, false);
1809                 pfree(new_status);
1810         }
1811
1812         LOCK_PRINT("WaitOnLock: wakeup on lock",
1813                            locallock->lock, locallock->tag.mode);
1814 }
1815
1816 /*
1817  * Remove a proc from the wait-queue it is on (caller must know it is on one).
1818  * This is only used when the proc has failed to get the lock, so we set its
1819  * waitStatus to STATUS_ERROR.
1820  *
1821  * Appropriate partition lock must be held by caller.  Also, caller is
1822  * responsible for signaling the proc if needed.
1823  *
1824  * NB: this does not clean up any locallock object that may exist for the lock.
1825  */
1826 void
1827 RemoveFromWaitQueue(PGPROC *proc, uint32 hashcode)
1828 {
1829         LOCK       *waitLock = proc->waitLock;
1830         PROCLOCK   *proclock = proc->waitProcLock;
1831         LOCKMODE        lockmode = proc->waitLockMode;
1832         LOCKMETHODID lockmethodid = LOCK_LOCKMETHOD(*waitLock);
1833
1834         /* Make sure proc is waiting */
1835         Assert(proc->waitStatus == STATUS_WAITING);
1836         Assert(proc->links.next != NULL);
1837         Assert(waitLock);
1838         Assert(waitLock->waitProcs.size > 0);
1839         Assert(0 < lockmethodid && lockmethodid < lengthof(LockMethods));
1840
1841         /* Remove proc from lock's wait queue */
1842         SHMQueueDelete(&(proc->links));
1843         waitLock->waitProcs.size--;
1844
1845         /* Undo increments of request counts by waiting process */
1846         Assert(waitLock->nRequested > 0);
1847         Assert(waitLock->nRequested > proc->waitLock->nGranted);
1848         waitLock->nRequested--;
1849         Assert(waitLock->requested[lockmode] > 0);
1850         waitLock->requested[lockmode]--;
1851         /* don't forget to clear waitMask bit if appropriate */
1852         if (waitLock->granted[lockmode] == waitLock->requested[lockmode])
1853                 waitLock->waitMask &= LOCKBIT_OFF(lockmode);
1854
1855         /* Clean up the proc's own state, and pass it the ok/fail signal */
1856         proc->waitLock = NULL;
1857         proc->waitProcLock = NULL;
1858         proc->waitStatus = STATUS_ERROR;
1859
1860         /*
1861          * Delete the proclock immediately if it represents no already-held locks.
1862          * (This must happen now because if the owner of the lock decides to
1863          * release it, and the requested/granted counts then go to zero,
1864          * LockRelease expects there to be no remaining proclocks.) Then see if
1865          * any other waiters for the lock can be woken up now.
1866          */
1867         CleanUpLock(waitLock, proclock,
1868                                 LockMethods[lockmethodid], hashcode,
1869                                 true);
1870 }
1871
1872 /*
1873  * LockRelease -- look up 'locktag' and release one 'lockmode' lock on it.
1874  *              Release a session lock if 'sessionLock' is true, else release a
1875  *              regular transaction lock.
1876  *
1877  * Side Effects: find any waiting processes that are now wakable,
1878  *              grant them their requested locks and awaken them.
1879  *              (We have to grant the lock here to avoid a race between
1880  *              the waking process and any new process to
1881  *              come along and request the lock.)
1882  */
1883 bool
1884 LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
1885 {
1886         LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
1887         LockMethod      lockMethodTable;
1888         LOCALLOCKTAG localtag;
1889         LOCALLOCK  *locallock;
1890         LOCK       *lock;
1891         PROCLOCK   *proclock;
1892         LWLock     *partitionLock;
1893         bool            wakeupNeeded;
1894
1895         if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
1896                 elog(ERROR, "unrecognized lock method: %d", lockmethodid);
1897         lockMethodTable = LockMethods[lockmethodid];
1898         if (lockmode <= 0 || lockmode > lockMethodTable->numLockModes)
1899                 elog(ERROR, "unrecognized lock mode: %d", lockmode);
1900
1901 #ifdef LOCK_DEBUG
1902         if (LOCK_DEBUG_ENABLED(locktag))
1903                 elog(LOG, "LockRelease: lock [%u,%u] %s",
1904                          locktag->locktag_field1, locktag->locktag_field2,
1905                          lockMethodTable->lockModeNames[lockmode]);
1906 #endif
1907
1908         /*
1909          * Find the LOCALLOCK entry for this lock and lockmode
1910          */
1911         MemSet(&localtag, 0, sizeof(localtag)); /* must clear padding */
1912         localtag.lock = *locktag;
1913         localtag.mode = lockmode;
1914
1915         locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash,
1916                                                                                   (void *) &localtag,
1917                                                                                   HASH_FIND, NULL);
1918
1919         /*
1920          * let the caller print its own error message, too. Do not ereport(ERROR).
1921          */
1922         if (!locallock || locallock->nLocks <= 0)
1923         {
1924                 elog(WARNING, "you don't own a lock of type %s",
1925                          lockMethodTable->lockModeNames[lockmode]);
1926                 return false;
1927         }
1928
1929         /*
1930          * Decrease the count for the resource owner.
1931          */
1932         {
1933                 LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
1934                 ResourceOwner owner;
1935                 int                     i;
1936
1937                 /* Identify owner for lock */
1938                 if (sessionLock)
1939                         owner = NULL;
1940                 else
1941                         owner = CurrentResourceOwner;
1942
1943                 for (i = locallock->numLockOwners - 1; i >= 0; i--)
1944                 {
1945                         if (lockOwners[i].owner == owner)
1946                         {
1947                                 Assert(lockOwners[i].nLocks > 0);
1948                                 if (--lockOwners[i].nLocks == 0)
1949                                 {
1950                                         if (owner != NULL)
1951                                                 ResourceOwnerForgetLock(owner, locallock);
1952                                         /* compact out unused slot */
1953                                         locallock->numLockOwners--;
1954                                         if (i < locallock->numLockOwners)
1955                                                 lockOwners[i] = lockOwners[locallock->numLockOwners];
1956                                 }
1957                                 break;
1958                         }
1959                 }
1960                 if (i < 0)
1961                 {
1962                         /* don't release a lock belonging to another owner */
1963                         elog(WARNING, "you don't own a lock of type %s",
1964                                  lockMethodTable->lockModeNames[lockmode]);
1965                         return false;
1966                 }
1967         }
1968
1969         /*
1970          * Decrease the total local count.  If we're still holding the lock, we're
1971          * done.
1972          */
1973         locallock->nLocks--;
1974
1975         if (locallock->nLocks > 0)
1976                 return true;
1977
1978         /*
1979          * At this point we can no longer suppose we are clear of invalidation
1980          * messages related to this lock.  Although we'll delete the LOCALLOCK
1981          * object before any intentional return from this routine, it seems worth
1982          * the trouble to explicitly reset lockCleared right now, just in case
1983          * some error prevents us from deleting the LOCALLOCK.
1984          */
1985         locallock->lockCleared = false;
1986
1987         /* Attempt fast release of any lock eligible for the fast path. */
1988         if (EligibleForRelationFastPath(locktag, lockmode) &&
1989                 FastPathLocalUseCount > 0)
1990         {
1991                 bool            released;
1992
1993                 /*
1994                  * We might not find the lock here, even if we originally entered it
1995                  * here.  Another backend may have moved it to the main table.
1996                  */
1997                 LWLockAcquire(&MyProc->backendLock, LW_EXCLUSIVE);
1998                 released = FastPathUnGrantRelationLock(locktag->locktag_field2,
1999                                                                                            lockmode);
2000                 LWLockRelease(&MyProc->backendLock);
2001                 if (released)
2002                 {
2003                         RemoveLocalLock(locallock);
2004                         return true;
2005                 }
2006         }
2007
2008         /*
2009          * Otherwise we've got to mess with the shared lock table.
2010          */
2011         partitionLock = LockHashPartitionLock(locallock->hashcode);
2012
2013         LWLockAcquire(partitionLock, LW_EXCLUSIVE);
2014
2015         /*
2016          * Normally, we don't need to re-find the lock or proclock, since we kept
2017          * their addresses in the locallock table, and they couldn't have been
2018          * removed while we were holding a lock on them.  But it's possible that
2019          * the lock was taken fast-path and has since been moved to the main hash
2020          * table by another backend, in which case we will need to look up the
2021          * objects here.  We assume the lock field is NULL if so.
2022          */
2023         lock = locallock->lock;
2024         if (!lock)
2025         {
2026                 PROCLOCKTAG proclocktag;
2027
2028                 Assert(EligibleForRelationFastPath(locktag, lockmode));
2029                 lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
2030                                                                                                         (const void *) locktag,
2031                                                                                                         locallock->hashcode,
2032                                                                                                         HASH_FIND,
2033                                                                                                         NULL);
2034                 if (!lock)
2035                         elog(ERROR, "failed to re-find shared lock object");
2036                 locallock->lock = lock;
2037
2038                 proclocktag.myLock = lock;
2039                 proclocktag.myProc = MyProc;
2040                 locallock->proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash,
2041                                                                                                            (void *) &proclocktag,
2042                                                                                                            HASH_FIND,
2043                                                                                                            NULL);
2044                 if (!locallock->proclock)
2045                         elog(ERROR, "failed to re-find shared proclock object");
2046         }
2047         LOCK_PRINT("LockRelease: found", lock, lockmode);
2048         proclock = locallock->proclock;
2049         PROCLOCK_PRINT("LockRelease: found", proclock);
2050
2051         /*
2052          * Double-check that we are actually holding a lock of the type we want to
2053          * release.
2054          */
2055         if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
2056         {
2057                 PROCLOCK_PRINT("LockRelease: WRONGTYPE", proclock);
2058                 LWLockRelease(partitionLock);
2059                 elog(WARNING, "you don't own a lock of type %s",
2060                          lockMethodTable->lockModeNames[lockmode]);
2061                 RemoveLocalLock(locallock);
2062                 return false;
2063         }
2064
2065         /*
2066          * Do the releasing.  CleanUpLock will waken any now-wakable waiters.
2067          */
2068         wakeupNeeded = UnGrantLock(lock, lockmode, proclock, lockMethodTable);
2069
2070         CleanUpLock(lock, proclock,
2071                                 lockMethodTable, locallock->hashcode,
2072                                 wakeupNeeded);
2073
2074         LWLockRelease(partitionLock);
2075
2076         RemoveLocalLock(locallock);
2077         return true;
2078 }
2079
2080 /*
2081  * LockReleaseAll -- Release all locks of the specified lock method that
2082  *              are held by the current process.
2083  *
2084  * Well, not necessarily *all* locks.  The available behaviors are:
2085  *              allLocks == true: release all locks including session locks.
2086  *              allLocks == false: release all non-session locks.
2087  */
2088 void
2089 LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks)
2090 {
2091         HASH_SEQ_STATUS status;
2092         LockMethod      lockMethodTable;
2093         int                     i,
2094                                 numLockModes;
2095         LOCALLOCK  *locallock;
2096         LOCK       *lock;
2097         PROCLOCK   *proclock;
2098         int                     partition;
2099         bool            have_fast_path_lwlock = false;
2100
2101         if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
2102                 elog(ERROR, "unrecognized lock method: %d", lockmethodid);
2103         lockMethodTable = LockMethods[lockmethodid];
2104
2105 #ifdef LOCK_DEBUG
2106         if (*(lockMethodTable->trace_flag))
2107                 elog(LOG, "LockReleaseAll: lockmethod=%d", lockmethodid);
2108 #endif
2109
2110         /*
2111          * Get rid of our fast-path VXID lock, if appropriate.  Note that this is
2112          * the only way that the lock we hold on our own VXID can ever get
2113          * released: it is always and only released when a toplevel transaction
2114          * ends.
2115          */
2116         if (lockmethodid == DEFAULT_LOCKMETHOD)
2117                 VirtualXactLockTableCleanup();
2118
2119         numLockModes = lockMethodTable->numLockModes;
2120
2121         /*
2122          * First we run through the locallock table and get rid of unwanted
2123          * entries, then we scan the process's proclocks and get rid of those. We
2124          * do this separately because we may have multiple locallock entries
2125          * pointing to the same proclock, and we daren't end up with any dangling
2126          * pointers.  Fast-path locks are cleaned up during the locallock table
2127          * scan, though.
2128          */
2129         hash_seq_init(&status, LockMethodLocalHash);
2130
2131         while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
2132         {
2133                 /*
2134                  * If the LOCALLOCK entry is unused, we must've run out of shared
2135                  * memory while trying to set up this lock.  Just forget the local
2136                  * entry.
2137                  */
2138                 if (locallock->nLocks == 0)
2139                 {
2140                         RemoveLocalLock(locallock);
2141                         continue;
2142                 }
2143
2144                 /* Ignore items that are not of the lockmethod to be removed */
2145                 if (LOCALLOCK_LOCKMETHOD(*locallock) != lockmethodid)
2146                         continue;
2147
2148                 /*
2149                  * If we are asked to release all locks, we can just zap the entry.
2150                  * Otherwise, must scan to see if there are session locks. We assume
2151                  * there is at most one lockOwners entry for session locks.
2152                  */
2153                 if (!allLocks)
2154                 {
2155                         LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
2156
2157                         /* If session lock is above array position 0, move it down to 0 */
2158                         for (i = 0; i < locallock->numLockOwners; i++)
2159                         {
2160                                 if (lockOwners[i].owner == NULL)
2161                                         lockOwners[0] = lockOwners[i];
2162                                 else
2163                                         ResourceOwnerForgetLock(lockOwners[i].owner, locallock);
2164                         }
2165
2166                         if (locallock->numLockOwners > 0 &&
2167                                 lockOwners[0].owner == NULL &&
2168                                 lockOwners[0].nLocks > 0)
2169                         {
2170                                 /* Fix the locallock to show just the session locks */
2171                                 locallock->nLocks = lockOwners[0].nLocks;
2172                                 locallock->numLockOwners = 1;
2173                                 /* We aren't deleting this locallock, so done */
2174                                 continue;
2175                         }
2176                         else
2177                                 locallock->numLockOwners = 0;
2178                 }
2179
2180                 /*
2181                  * If the lock or proclock pointers are NULL, this lock was taken via
2182                  * the relation fast-path (and is not known to have been transferred).
2183                  */
2184                 if (locallock->proclock == NULL || locallock->lock == NULL)
2185                 {
2186                         LOCKMODE        lockmode = locallock->tag.mode;
2187                         Oid                     relid;
2188
2189                         /* Verify that a fast-path lock is what we've got. */
2190                         if (!EligibleForRelationFastPath(&locallock->tag.lock, lockmode))
2191                                 elog(PANIC, "locallock table corrupted");
2192
2193                         /*
2194                          * If we don't currently hold the LWLock that protects our
2195                          * fast-path data structures, we must acquire it before attempting
2196                          * to release the lock via the fast-path.  We will continue to
2197                          * hold the LWLock until we're done scanning the locallock table,
2198                          * unless we hit a transferred fast-path lock.  (XXX is this
2199                          * really such a good idea?  There could be a lot of entries ...)
2200                          */
2201                         if (!have_fast_path_lwlock)
2202                         {
2203                                 LWLockAcquire(&MyProc->backendLock, LW_EXCLUSIVE);
2204                                 have_fast_path_lwlock = true;
2205                         }
2206
2207                         /* Attempt fast-path release. */
2208                         relid = locallock->tag.lock.locktag_field2;
2209                         if (FastPathUnGrantRelationLock(relid, lockmode))
2210                         {
2211                                 RemoveLocalLock(locallock);
2212                                 continue;
2213                         }
2214
2215                         /*
2216                          * Our lock, originally taken via the fast path, has been
2217                          * transferred to the main lock table.  That's going to require
2218                          * some extra work, so release our fast-path lock before starting.
2219                          */
2220                         LWLockRelease(&MyProc->backendLock);
2221                         have_fast_path_lwlock = false;
2222
2223                         /*
2224                          * Now dump the lock.  We haven't got a pointer to the LOCK or
2225                          * PROCLOCK in this case, so we have to handle this a bit
2226                          * differently than a normal lock release.  Unfortunately, this
2227                          * requires an extra LWLock acquire-and-release cycle on the
2228                          * partitionLock, but hopefully it shouldn't happen often.
2229                          */
2230                         LockRefindAndRelease(lockMethodTable, MyProc,
2231                                                                  &locallock->tag.lock, lockmode, false);
2232                         RemoveLocalLock(locallock);
2233                         continue;
2234                 }
2235
2236                 /* Mark the proclock to show we need to release this lockmode */
2237                 if (locallock->nLocks > 0)
2238                         locallock->proclock->releaseMask |= LOCKBIT_ON(locallock->tag.mode);
2239
2240                 /* And remove the locallock hashtable entry */
2241                 RemoveLocalLock(locallock);
2242         }
2243
2244         /* Done with the fast-path data structures */
2245         if (have_fast_path_lwlock)
2246                 LWLockRelease(&MyProc->backendLock);
2247
2248         /*
2249          * Now, scan each lock partition separately.
2250          */
2251         for (partition = 0; partition < NUM_LOCK_PARTITIONS; partition++)
2252         {
2253                 LWLock     *partitionLock;
2254                 SHM_QUEUE  *procLocks = &(MyProc->myProcLocks[partition]);
2255                 PROCLOCK   *nextplock;
2256
2257                 partitionLock = LockHashPartitionLockByIndex(partition);
2258
2259                 /*
2260                  * If the proclock list for this partition is empty, we can skip
2261                  * acquiring the partition lock.  This optimization is trickier than
2262                  * it looks, because another backend could be in process of adding
2263                  * something to our proclock list due to promoting one of our
2264                  * fast-path locks.  However, any such lock must be one that we
2265                  * decided not to delete above, so it's okay to skip it again now;
2266                  * we'd just decide not to delete it again.  We must, however, be
2267                  * careful to re-fetch the list header once we've acquired the
2268                  * partition lock, to be sure we have a valid, up-to-date pointer.
2269                  * (There is probably no significant risk if pointer fetch/store is
2270                  * atomic, but we don't wish to assume that.)
2271                  *
2272                  * XXX This argument assumes that the locallock table correctly
2273                  * represents all of our fast-path locks.  While allLocks mode
2274                  * guarantees to clean up all of our normal locks regardless of the
2275                  * locallock situation, we lose that guarantee for fast-path locks.
2276                  * This is not ideal.
2277                  */
2278                 if (SHMQueueNext(procLocks, procLocks,
2279                                                  offsetof(PROCLOCK, procLink)) == NULL)
2280                         continue;                       /* needn't examine this partition */
2281
2282                 LWLockAcquire(partitionLock, LW_EXCLUSIVE);
2283
2284                 for (proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
2285                                                                                                   offsetof(PROCLOCK, procLink));
2286                          proclock;
2287                          proclock = nextplock)
2288                 {
2289                         bool            wakeupNeeded = false;
2290
2291                         /* Get link first, since we may unlink/delete this proclock */
2292                         nextplock = (PROCLOCK *)
2293                                 SHMQueueNext(procLocks, &proclock->procLink,
2294                                                          offsetof(PROCLOCK, procLink));
2295
2296                         Assert(proclock->tag.myProc == MyProc);
2297
2298                         lock = proclock->tag.myLock;
2299
2300                         /* Ignore items that are not of the lockmethod to be removed */
2301                         if (LOCK_LOCKMETHOD(*lock) != lockmethodid)
2302                                 continue;
2303
2304                         /*
2305                          * In allLocks mode, force release of all locks even if locallock
2306                          * table had problems
2307                          */
2308                         if (allLocks)
2309                                 proclock->releaseMask = proclock->holdMask;
2310                         else
2311                                 Assert((proclock->releaseMask & ~proclock->holdMask) == 0);
2312
2313                         /*
2314                          * Ignore items that have nothing to be released, unless they have
2315                          * holdMask == 0 and are therefore recyclable
2316                          */
2317                         if (proclock->releaseMask == 0 && proclock->holdMask != 0)
2318                                 continue;
2319
2320                         PROCLOCK_PRINT("LockReleaseAll", proclock);
2321                         LOCK_PRINT("LockReleaseAll", lock, 0);
2322                         Assert(lock->nRequested >= 0);
2323                         Assert(lock->nGranted >= 0);
2324                         Assert(lock->nGranted <= lock->nRequested);
2325                         Assert((proclock->holdMask & ~lock->grantMask) == 0);
2326
2327                         /*
2328                          * Release the previously-marked lock modes
2329                          */
2330                         for (i = 1; i <= numLockModes; i++)
2331                         {
2332                                 if (proclock->releaseMask & LOCKBIT_ON(i))
2333                                         wakeupNeeded |= UnGrantLock(lock, i, proclock,
2334                                                                                                 lockMethodTable);
2335                         }
2336                         Assert((lock->nRequested >= 0) && (lock->nGranted >= 0));
2337                         Assert(lock->nGranted <= lock->nRequested);
2338                         LOCK_PRINT("LockReleaseAll: updated", lock, 0);
2339
2340                         proclock->releaseMask = 0;
2341
2342                         /* CleanUpLock will wake up waiters if needed. */
2343                         CleanUpLock(lock, proclock,
2344                                                 lockMethodTable,
2345                                                 LockTagHashCode(&lock->tag),
2346                                                 wakeupNeeded);
2347                 }                                               /* loop over PROCLOCKs within this partition */
2348
2349                 LWLockRelease(partitionLock);
2350         }                                                       /* loop over partitions */
2351
2352 #ifdef LOCK_DEBUG
2353         if (*(lockMethodTable->trace_flag))
2354                 elog(LOG, "LockReleaseAll done");
2355 #endif
2356 }
2357
2358 /*
2359  * LockReleaseSession -- Release all session locks of the specified lock method
2360  *              that are held by the current process.
2361  */
2362 void
2363 LockReleaseSession(LOCKMETHODID lockmethodid)
2364 {
2365         HASH_SEQ_STATUS status;
2366         LOCALLOCK  *locallock;
2367
2368         if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
2369                 elog(ERROR, "unrecognized lock method: %d", lockmethodid);
2370
2371         hash_seq_init(&status, LockMethodLocalHash);
2372
2373         while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
2374         {
2375                 /* Ignore items that are not of the specified lock method */
2376                 if (LOCALLOCK_LOCKMETHOD(*locallock) != lockmethodid)
2377                         continue;
2378
2379                 ReleaseLockIfHeld(locallock, true);
2380         }
2381 }
2382
2383 /*
2384  * LockReleaseCurrentOwner
2385  *              Release all locks belonging to CurrentResourceOwner
2386  *
2387  * If the caller knows what those locks are, it can pass them as an array.
2388  * That speeds up the call significantly, when a lot of locks are held.
2389  * Otherwise, pass NULL for locallocks, and we'll traverse through our hash
2390  * table to find them.
2391  */
2392 void
2393 LockReleaseCurrentOwner(LOCALLOCK **locallocks, int nlocks)
2394 {
2395         if (locallocks == NULL)
2396         {
2397                 HASH_SEQ_STATUS status;
2398                 LOCALLOCK  *locallock;
2399
2400                 hash_seq_init(&status, LockMethodLocalHash);
2401
2402                 while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
2403                         ReleaseLockIfHeld(locallock, false);
2404         }
2405         else
2406         {
2407                 int                     i;
2408
2409                 for (i = nlocks - 1; i >= 0; i--)
2410                         ReleaseLockIfHeld(locallocks[i], false);
2411         }
2412 }
2413
2414 /*
2415  * ReleaseLockIfHeld
2416  *              Release any session-level locks on this lockable object if sessionLock
2417  *              is true; else, release any locks held by CurrentResourceOwner.
2418  *
2419  * It is tempting to pass this a ResourceOwner pointer (or NULL for session
2420  * locks), but without refactoring LockRelease() we cannot support releasing
2421  * locks belonging to resource owners other than CurrentResourceOwner.
2422  * If we were to refactor, it'd be a good idea to fix it so we don't have to
2423  * do a hashtable lookup of the locallock, too.  However, currently this
2424  * function isn't used heavily enough to justify refactoring for its
2425  * convenience.
2426  */
2427 static void
2428 ReleaseLockIfHeld(LOCALLOCK *locallock, bool sessionLock)
2429 {
2430         ResourceOwner owner;
2431         LOCALLOCKOWNER *lockOwners;
2432         int                     i;
2433
2434         /* Identify owner for lock (must match LockRelease!) */
2435         if (sessionLock)
2436                 owner = NULL;
2437         else
2438                 owner = CurrentResourceOwner;
2439
2440         /* Scan to see if there are any locks belonging to the target owner */
2441         lockOwners = locallock->lockOwners;
2442         for (i = locallock->numLockOwners - 1; i >= 0; i--)
2443         {
2444                 if (lockOwners[i].owner == owner)
2445                 {
2446                         Assert(lockOwners[i].nLocks > 0);
2447                         if (lockOwners[i].nLocks < locallock->nLocks)
2448                         {
2449                                 /*
2450                                  * We will still hold this lock after forgetting this
2451                                  * ResourceOwner.
2452                                  */
2453                                 locallock->nLocks -= lockOwners[i].nLocks;
2454                                 /* compact out unused slot */
2455                                 locallock->numLockOwners--;
2456                                 if (owner != NULL)
2457                                         ResourceOwnerForgetLock(owner, locallock);
2458                                 if (i < locallock->numLockOwners)
2459                                         lockOwners[i] = lockOwners[locallock->numLockOwners];
2460                         }
2461                         else
2462                         {
2463                                 Assert(lockOwners[i].nLocks == locallock->nLocks);
2464                                 /* We want to call LockRelease just once */
2465                                 lockOwners[i].nLocks = 1;
2466                                 locallock->nLocks = 1;
2467                                 if (!LockRelease(&locallock->tag.lock,
2468                                                                  locallock->tag.mode,
2469                                                                  sessionLock))
2470                                         elog(WARNING, "ReleaseLockIfHeld: failed??");
2471                         }
2472                         break;
2473                 }
2474         }
2475 }
2476
2477 /*
2478  * LockReassignCurrentOwner
2479  *              Reassign all locks belonging to CurrentResourceOwner to belong
2480  *              to its parent resource owner.
2481  *
2482  * If the caller knows what those locks are, it can pass them as an array.
2483  * That speeds up the call significantly, when a lot of locks are held
2484  * (e.g pg_dump with a large schema).  Otherwise, pass NULL for locallocks,
2485  * and we'll traverse through our hash table to find them.
2486  */
2487 void
2488 LockReassignCurrentOwner(LOCALLOCK **locallocks, int nlocks)
2489 {
2490         ResourceOwner parent = ResourceOwnerGetParent(CurrentResourceOwner);
2491
2492         Assert(parent != NULL);
2493
2494         if (locallocks == NULL)
2495         {
2496                 HASH_SEQ_STATUS status;
2497                 LOCALLOCK  *locallock;
2498
2499                 hash_seq_init(&status, LockMethodLocalHash);
2500
2501                 while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
2502                         LockReassignOwner(locallock, parent);
2503         }
2504         else
2505         {
2506                 int                     i;
2507
2508                 for (i = nlocks - 1; i >= 0; i--)
2509                         LockReassignOwner(locallocks[i], parent);
2510         }
2511 }
2512
2513 /*
2514  * Subroutine of LockReassignCurrentOwner. Reassigns a given lock belonging to
2515  * CurrentResourceOwner to its parent.
2516  */
2517 static void
2518 LockReassignOwner(LOCALLOCK *locallock, ResourceOwner parent)
2519 {
2520         LOCALLOCKOWNER *lockOwners;
2521         int                     i;
2522         int                     ic = -1;
2523         int                     ip = -1;
2524
2525         /*
2526          * Scan to see if there are any locks belonging to current owner or its
2527          * parent
2528          */
2529         lockOwners = locallock->lockOwners;
2530         for (i = locallock->numLockOwners - 1; i >= 0; i--)
2531         {
2532                 if (lockOwners[i].owner == CurrentResourceOwner)
2533                         ic = i;
2534                 else if (lockOwners[i].owner == parent)
2535                         ip = i;
2536         }
2537
2538         if (ic < 0)
2539                 return;                                 /* no current locks */
2540
2541         if (ip < 0)
2542         {
2543                 /* Parent has no slot, so just give it the child's slot */
2544                 lockOwners[ic].owner = parent;
2545                 ResourceOwnerRememberLock(parent, locallock);
2546         }
2547         else
2548         {
2549                 /* Merge child's count with parent's */
2550                 lockOwners[ip].nLocks += lockOwners[ic].nLocks;
2551                 /* compact out unused slot */
2552                 locallock->numLockOwners--;
2553                 if (ic < locallock->numLockOwners)
2554                         lockOwners[ic] = lockOwners[locallock->numLockOwners];
2555         }
2556         ResourceOwnerForgetLock(CurrentResourceOwner, locallock);
2557 }
2558
2559 /*
2560  * FastPathGrantRelationLock
2561  *              Grant lock using per-backend fast-path array, if there is space.
2562  */
2563 static bool
2564 FastPathGrantRelationLock(Oid relid, LOCKMODE lockmode)
2565 {
2566         uint32          f;
2567         uint32          unused_slot = FP_LOCK_SLOTS_PER_BACKEND;
2568
2569         /* Scan for existing entry for this relid, remembering empty slot. */
2570         for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; f++)
2571         {
2572                 if (FAST_PATH_GET_BITS(MyProc, f) == 0)
2573                         unused_slot = f;
2574                 else if (MyProc->fpRelId[f] == relid)
2575                 {
2576                         Assert(!FAST_PATH_CHECK_LOCKMODE(MyProc, f, lockmode));
2577                         FAST_PATH_SET_LOCKMODE(MyProc, f, lockmode);
2578                         return true;
2579                 }
2580         }
2581
2582         /* If no existing entry, use any empty slot. */
2583         if (unused_slot < FP_LOCK_SLOTS_PER_BACKEND)
2584         {
2585                 MyProc->fpRelId[unused_slot] = relid;
2586                 FAST_PATH_SET_LOCKMODE(MyProc, unused_slot, lockmode);
2587                 ++FastPathLocalUseCount;
2588                 return true;
2589         }
2590
2591         /* No existing entry, and no empty slot. */
2592         return false;
2593 }
2594
2595 /*
2596  * FastPathUnGrantRelationLock
2597  *              Release fast-path lock, if present.  Update backend-private local
2598  *              use count, while we're at it.
2599  */
2600 static bool
2601 FastPathUnGrantRelationLock(Oid relid, LOCKMODE lockmode)
2602 {
2603         uint32          f;
2604         bool            result = false;
2605
2606         FastPathLocalUseCount = 0;
2607         for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; f++)
2608         {
2609                 if (MyProc->fpRelId[f] == relid
2610                         && FAST_PATH_CHECK_LOCKMODE(MyProc, f, lockmode))
2611                 {
2612                         Assert(!result);
2613                         FAST_PATH_CLEAR_LOCKMODE(MyProc, f, lockmode);
2614                         result = true;
2615                         /* we continue iterating so as to update FastPathLocalUseCount */
2616                 }
2617                 if (FAST_PATH_GET_BITS(MyProc, f) != 0)
2618                         ++FastPathLocalUseCount;
2619         }
2620         return result;
2621 }
2622
2623 /*
2624  * FastPathTransferRelationLocks
2625  *              Transfer locks matching the given lock tag from per-backend fast-path
2626  *              arrays to the shared hash table.
2627  *
2628  * Returns true if successful, false if ran out of shared memory.
2629  */
2630 static bool
2631 FastPathTransferRelationLocks(LockMethod lockMethodTable, const LOCKTAG *locktag,
2632                                                           uint32 hashcode)
2633 {
2634         LWLock     *partitionLock = LockHashPartitionLock(hashcode);
2635         Oid                     relid = locktag->locktag_field2;
2636         uint32          i;
2637
2638         /*
2639          * Every PGPROC that can potentially hold a fast-path lock is present in
2640          * ProcGlobal->allProcs.  Prepared transactions are not, but any
2641          * outstanding fast-path locks held by prepared transactions are
2642          * transferred to the main lock table.
2643          */
2644         for (i = 0; i < ProcGlobal->allProcCount; i++)
2645         {
2646                 PGPROC     *proc = &ProcGlobal->allProcs[i];
2647                 uint32          f;
2648
2649                 LWLockAcquire(&proc->backendLock, LW_EXCLUSIVE);
2650
2651                 /*
2652                  * If the target backend isn't referencing the same database as the
2653                  * lock, then we needn't examine the individual relation IDs at all;
2654                  * none of them can be relevant.
2655                  *
2656                  * proc->databaseId is set at backend startup time and never changes
2657                  * thereafter, so it might be safe to perform this test before
2658                  * acquiring &proc->backendLock.  In particular, it's certainly safe
2659                  * to assume that if the target backend holds any fast-path locks, it
2660                  * must have performed a memory-fencing operation (in particular, an
2661                  * LWLock acquisition) since setting proc->databaseId.  However, it's
2662                  * less clear that our backend is certain to have performed a memory
2663                  * fencing operation since the other backend set proc->databaseId.  So
2664                  * for now, we test it after acquiring the LWLock just to be safe.
2665                  */
2666                 if (proc->databaseId != locktag->locktag_field1)
2667                 {
2668                         LWLockRelease(&proc->backendLock);
2669                         continue;
2670                 }
2671
2672                 for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; f++)
2673                 {
2674                         uint32          lockmode;
2675
2676                         /* Look for an allocated slot matching the given relid. */
2677                         if (relid != proc->fpRelId[f] || FAST_PATH_GET_BITS(proc, f) == 0)
2678                                 continue;
2679
2680                         /* Find or create lock object. */
2681                         LWLockAcquire(partitionLock, LW_EXCLUSIVE);
2682                         for (lockmode = FAST_PATH_LOCKNUMBER_OFFSET;
2683                                  lockmode < FAST_PATH_LOCKNUMBER_OFFSET + FAST_PATH_BITS_PER_SLOT;
2684                                  ++lockmode)
2685                         {
2686                                 PROCLOCK   *proclock;
2687
2688                                 if (!FAST_PATH_CHECK_LOCKMODE(proc, f, lockmode))
2689                                         continue;
2690                                 proclock = SetupLockInTable(lockMethodTable, proc, locktag,
2691                                                                                         hashcode, lockmode);
2692                                 if (!proclock)
2693                                 {
2694                                         LWLockRelease(partitionLock);
2695                                         LWLockRelease(&proc->backendLock);
2696                                         return false;
2697                                 }
2698                                 GrantLock(proclock->tag.myLock, proclock, lockmode);
2699                                 FAST_PATH_CLEAR_LOCKMODE(proc, f, lockmode);
2700                         }
2701                         LWLockRelease(partitionLock);
2702
2703                         /* No need to examine remaining slots. */
2704                         break;
2705                 }
2706                 LWLockRelease(&proc->backendLock);
2707         }
2708         return true;
2709 }
2710
2711 /*
2712  * FastPathGetRelationLockEntry
2713  *              Return the PROCLOCK for a lock originally taken via the fast-path,
2714  *              transferring it to the primary lock table if necessary.
2715  *
2716  * Note: caller takes care of updating the locallock object.
2717  */
2718 static PROCLOCK *
2719 FastPathGetRelationLockEntry(LOCALLOCK *locallock)
2720 {
2721         LockMethod      lockMethodTable = LockMethods[DEFAULT_LOCKMETHOD];
2722         LOCKTAG    *locktag = &locallock->tag.lock;
2723         PROCLOCK   *proclock = NULL;
2724         LWLock     *partitionLock = LockHashPartitionLock(locallock->hashcode);
2725         Oid                     relid = locktag->locktag_field2;
2726         uint32          f;
2727
2728         LWLockAcquire(&MyProc->backendLock, LW_EXCLUSIVE);
2729
2730         for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; f++)
2731         {
2732                 uint32          lockmode;
2733
2734                 /* Look for an allocated slot matching the given relid. */
2735                 if (relid != MyProc->fpRelId[f] || FAST_PATH_GET_BITS(MyProc, f) == 0)
2736                         continue;
2737
2738                 /* If we don't have a lock of the given mode, forget it! */
2739                 lockmode = locallock->tag.mode;
2740                 if (!FAST_PATH_CHECK_LOCKMODE(MyProc, f, lockmode))
2741                         break;
2742
2743                 /* Find or create lock object. */
2744                 LWLockAcquire(partitionLock, LW_EXCLUSIVE);
2745
2746                 proclock = SetupLockInTable(lockMethodTable, MyProc, locktag,
2747                                                                         locallock->hashcode, lockmode);
2748                 if (!proclock)
2749                 {
2750                         LWLockRelease(partitionLock);
2751                         LWLockRelease(&MyProc->backendLock);
2752                         ereport(ERROR,
2753                                         (errcode(ERRCODE_OUT_OF_MEMORY),
2754                                          errmsg("out of shared memory"),
2755                                          errhint("You might need to increase max_locks_per_transaction.")));
2756                 }
2757                 GrantLock(proclock->tag.myLock, proclock, lockmode);
2758                 FAST_PATH_CLEAR_LOCKMODE(MyProc, f, lockmode);
2759
2760                 LWLockRelease(partitionLock);
2761
2762                 /* No need to examine remaining slots. */
2763                 break;
2764         }
2765
2766         LWLockRelease(&MyProc->backendLock);
2767
2768         /* Lock may have already been transferred by some other backend. */
2769         if (proclock == NULL)
2770         {
2771                 LOCK       *lock;
2772                 PROCLOCKTAG proclocktag;
2773                 uint32          proclock_hashcode;
2774
2775                 LWLockAcquire(partitionLock, LW_SHARED);
2776
2777                 lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
2778                                                                                                         (void *) locktag,
2779                                                                                                         locallock->hashcode,
2780                                                                                                         HASH_FIND,
2781                                                                                                         NULL);
2782                 if (!lock)
2783                         elog(ERROR, "failed to re-find shared lock object");
2784
2785                 proclocktag.myLock = lock;
2786                 proclocktag.myProc = MyProc;
2787
2788                 proclock_hashcode = ProcLockHashCode(&proclocktag, locallock->hashcode);
2789                 proclock = (PROCLOCK *)
2790                         hash_search_with_hash_value(LockMethodProcLockHash,
2791                                                                                 (void *) &proclocktag,
2792                                                                                 proclock_hashcode,
2793                                                                                 HASH_FIND,
2794                                                                                 NULL);
2795                 if (!proclock)
2796                         elog(ERROR, "failed to re-find shared proclock object");
2797                 LWLockRelease(partitionLock);
2798         }
2799
2800         return proclock;
2801 }
2802
2803 /*
2804  * GetLockConflicts
2805  *              Get an array of VirtualTransactionIds of xacts currently holding locks
2806  *              that would conflict with the specified lock/lockmode.
2807  *              xacts merely awaiting such a lock are NOT reported.
2808  *
2809  * The result array is palloc'd and is terminated with an invalid VXID.
2810  * *countp, if not null, is updated to the number of items set.
2811  *
2812  * Of course, the result could be out of date by the time it's returned,
2813  * so use of this function has to be thought about carefully.
2814  *
2815  * Note we never include the current xact's vxid in the result array,
2816  * since an xact never blocks itself.  Also, prepared transactions are
2817  * ignored, which is a bit more debatable but is appropriate for current
2818  * uses of the result.
2819  */
2820 VirtualTransactionId *
2821 GetLockConflicts(const LOCKTAG *locktag, LOCKMODE lockmode, int *countp)
2822 {
2823         static VirtualTransactionId *vxids;
2824         LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
2825         LockMethod      lockMethodTable;
2826         LOCK       *lock;
2827         LOCKMASK        conflictMask;
2828         SHM_QUEUE  *procLocks;
2829         PROCLOCK   *proclock;
2830         uint32          hashcode;
2831         LWLock     *partitionLock;
2832         int                     count = 0;
2833         int                     fast_count = 0;
2834
2835         if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
2836                 elog(ERROR, "unrecognized lock method: %d", lockmethodid);
2837         lockMethodTable = LockMethods[lockmethodid];
2838         if (lockmode <= 0 || lockmode > lockMethodTable->numLockModes)
2839                 elog(ERROR, "unrecognized lock mode: %d", lockmode);
2840
2841         /*
2842          * Allocate memory to store results, and fill with InvalidVXID.  We only
2843          * need enough space for MaxBackends + a terminator, since prepared xacts
2844          * don't count. InHotStandby allocate once in TopMemoryContext.
2845          */
2846         if (InHotStandby)
2847         {
2848                 if (vxids == NULL)
2849                         vxids = (VirtualTransactionId *)
2850                                 MemoryContextAlloc(TopMemoryContext,
2851                                                                    sizeof(VirtualTransactionId) * (MaxBackends + 1));
2852         }
2853         else
2854                 vxids = (VirtualTransactionId *)
2855                         palloc0(sizeof(VirtualTransactionId) * (MaxBackends + 1));
2856
2857         /* Compute hash code and partition lock, and look up conflicting modes. */
2858         hashcode = LockTagHashCode(locktag);
2859         partitionLock = LockHashPartitionLock(hashcode);
2860         conflictMask = lockMethodTable->conflictTab[lockmode];
2861
2862         /*
2863          * Fast path locks might not have been entered in the primary lock table.
2864          * If the lock we're dealing with could conflict with such a lock, we must
2865          * examine each backend's fast-path array for conflicts.
2866          */
2867         if (ConflictsWithRelationFastPath(locktag, lockmode))
2868         {
2869                 int                     i;
2870                 Oid                     relid = locktag->locktag_field2;
2871                 VirtualTransactionId vxid;
2872
2873                 /*
2874                  * Iterate over relevant PGPROCs.  Anything held by a prepared
2875                  * transaction will have been transferred to the primary lock table,
2876                  * so we need not worry about those.  This is all a bit fuzzy, because
2877                  * new locks could be taken after we've visited a particular
2878                  * partition, but the callers had better be prepared to deal with that
2879                  * anyway, since the locks could equally well be taken between the
2880                  * time we return the value and the time the caller does something
2881                  * with it.
2882                  */
2883                 for (i = 0; i < ProcGlobal->allProcCount; i++)
2884                 {
2885                         PGPROC     *proc = &ProcGlobal->allProcs[i];
2886                         uint32          f;
2887
2888                         /* A backend never blocks itself */
2889                         if (proc == MyProc)
2890                                 continue;
2891
2892                         LWLockAcquire(&proc->backendLock, LW_SHARED);
2893
2894                         /*
2895                          * If the target backend isn't referencing the same database as
2896                          * the lock, then we needn't examine the individual relation IDs
2897                          * at all; none of them can be relevant.
2898                          *
2899                          * See FastPathTransferRelationLocks() for discussion of why we do
2900                          * this test after acquiring the lock.
2901                          */
2902                         if (proc->databaseId != locktag->locktag_field1)
2903                         {
2904                                 LWLockRelease(&proc->backendLock);
2905                                 continue;
2906                         }
2907
2908                         for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; f++)
2909                         {
2910                                 uint32          lockmask;
2911
2912                                 /* Look for an allocated slot matching the given relid. */
2913                                 if (relid != proc->fpRelId[f])
2914                                         continue;
2915                                 lockmask = FAST_PATH_GET_BITS(proc, f);
2916                                 if (!lockmask)
2917                                         continue;
2918                                 lockmask <<= FAST_PATH_LOCKNUMBER_OFFSET;
2919
2920                                 /*
2921                                  * There can only be one entry per relation, so if we found it
2922                                  * and it doesn't conflict, we can skip the rest of the slots.
2923                                  */
2924                                 if ((lockmask & conflictMask) == 0)
2925                                         break;
2926
2927                                 /* Conflict! */
2928                                 GET_VXID_FROM_PGPROC(vxid, *proc);
2929
2930                                 /*
2931                                  * If we see an invalid VXID, then either the xact has already
2932                                  * committed (or aborted), or it's a prepared xact.  In either
2933                                  * case we may ignore it.
2934                                  */
2935                                 if (VirtualTransactionIdIsValid(vxid))
2936                                         vxids[count++] = vxid;
2937
2938                                 /* No need to examine remaining slots. */
2939                                 break;
2940                         }
2941
2942                         LWLockRelease(&proc->backendLock);
2943                 }
2944         }
2945
2946         /* Remember how many fast-path conflicts we found. */
2947         fast_count = count;
2948
2949         /*
2950          * Look up the lock object matching the tag.
2951          */
2952         LWLockAcquire(partitionLock, LW_SHARED);
2953
2954         lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
2955                                                                                                 (const void *) locktag,
2956                                                                                                 hashcode,
2957                                                                                                 HASH_FIND,
2958                                                                                                 NULL);
2959         if (!lock)
2960         {
2961                 /*
2962                  * If the lock object doesn't exist, there is nothing holding a lock
2963                  * on this lockable object.
2964                  */
2965                 LWLockRelease(partitionLock);
2966                 vxids[count].backendId = InvalidBackendId;
2967                 vxids[count].localTransactionId = InvalidLocalTransactionId;
2968                 if (countp)
2969                         *countp = count;
2970                 return vxids;
2971         }
2972
2973         /*
2974          * Examine each existing holder (or awaiter) of the lock.
2975          */
2976
2977         procLocks = &(lock->procLocks);
2978
2979         proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
2980                                                                                  offsetof(PROCLOCK, lockLink));
2981
2982         while (proclock)
2983         {
2984                 if (conflictMask & proclock->holdMask)
2985                 {
2986                         PGPROC     *proc = proclock->tag.myProc;
2987
2988                         /* A backend never blocks itself */
2989                         if (proc != MyProc)
2990                         {
2991                                 VirtualTransactionId vxid;
2992
2993                                 GET_VXID_FROM_PGPROC(vxid, *proc);
2994
2995                                 /*
2996                                  * If we see an invalid VXID, then either the xact has already
2997                                  * committed (or aborted), or it's a prepared xact.  In either
2998                                  * case we may ignore it.
2999                                  */
3000                                 if (VirtualTransactionIdIsValid(vxid))
3001                                 {
3002                                         int                     i;
3003
3004                                         /* Avoid duplicate entries. */
3005                                         for (i = 0; i < fast_count; ++i)
3006                                                 if (VirtualTransactionIdEquals(vxids[i], vxid))
3007                                                         break;
3008                                         if (i >= fast_count)
3009                                                 vxids[count++] = vxid;
3010                                 }
3011                         }
3012                 }
3013
3014                 proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->lockLink,
3015                                                                                          offsetof(PROCLOCK, lockLink));
3016         }
3017
3018         LWLockRelease(partitionLock);
3019
3020         if (count > MaxBackends)        /* should never happen */
3021                 elog(PANIC, "too many conflicting locks found");
3022
3023         vxids[count].backendId = InvalidBackendId;
3024         vxids[count].localTransactionId = InvalidLocalTransactionId;
3025         if (countp)
3026                 *countp = count;
3027         return vxids;
3028 }
3029
3030 /*
3031  * Find a lock in the shared lock table and release it.  It is the caller's
3032  * responsibility to verify that this is a sane thing to do.  (For example, it
3033  * would be bad to release a lock here if there might still be a LOCALLOCK
3034  * object with pointers to it.)
3035  *
3036  * We currently use this in two situations: first, to release locks held by
3037  * prepared transactions on commit (see lock_twophase_postcommit); and second,
3038  * to release locks taken via the fast-path, transferred to the main hash
3039  * table, and then released (see LockReleaseAll).
3040  */
3041 static void
3042 LockRefindAndRelease(LockMethod lockMethodTable, PGPROC *proc,
3043                                          LOCKTAG *locktag, LOCKMODE lockmode,
3044                                          bool decrement_strong_lock_count)
3045 {
3046         LOCK       *lock;
3047         PROCLOCK   *proclock;
3048         PROCLOCKTAG proclocktag;
3049         uint32          hashcode;
3050         uint32          proclock_hashcode;
3051         LWLock     *partitionLock;
3052         bool            wakeupNeeded;
3053
3054         hashcode = LockTagHashCode(locktag);
3055         partitionLock = LockHashPartitionLock(hashcode);
3056
3057         LWLockAcquire(partitionLock, LW_EXCLUSIVE);
3058
3059         /*
3060          * Re-find the lock object (it had better be there).
3061          */
3062         lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
3063                                                                                                 (void *) locktag,
3064                                                                                                 hashcode,
3065                                                                                                 HASH_FIND,
3066                                                                                                 NULL);
3067         if (!lock)
3068                 elog(PANIC, "failed to re-find shared lock object");
3069
3070         /*
3071          * Re-find the proclock object (ditto).
3072          */
3073         proclocktag.myLock = lock;
3074         proclocktag.myProc = proc;
3075
3076         proclock_hashcode = ProcLockHashCode(&proclocktag, hashcode);
3077
3078         proclock = (PROCLOCK *) hash_search_with_hash_value(LockMethodProcLockHash,
3079                                                                                                                 (void *) &proclocktag,
3080                                                                                                                 proclock_hashcode,
3081                                                                                                                 HASH_FIND,
3082                                                                                                                 NULL);
3083         if (!proclock)
3084                 elog(PANIC, "failed to re-find shared proclock object");
3085
3086         /*
3087          * Double-check that we are actually holding a lock of the type we want to
3088          * release.
3089          */
3090         if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
3091         {
3092                 PROCLOCK_PRINT("lock_twophase_postcommit: WRONGTYPE", proclock);
3093                 LWLockRelease(partitionLock);
3094                 elog(WARNING, "you don't own a lock of type %s",
3095                          lockMethodTable->lockModeNames[lockmode]);
3096                 return;
3097         }
3098
3099         /*
3100          * Do the releasing.  CleanUpLock will waken any now-wakable waiters.
3101          */
3102         wakeupNeeded = UnGrantLock(lock, lockmode, proclock, lockMethodTable);
3103
3104         CleanUpLock(lock, proclock,
3105                                 lockMethodTable, hashcode,
3106                                 wakeupNeeded);
3107
3108         LWLockRelease(partitionLock);
3109
3110         /*
3111          * Decrement strong lock count.  This logic is needed only for 2PC.
3112          */
3113         if (decrement_strong_lock_count
3114                 && ConflictsWithRelationFastPath(locktag, lockmode))
3115         {
3116                 uint32          fasthashcode = FastPathStrongLockHashPartition(hashcode);
3117
3118                 SpinLockAcquire(&FastPathStrongRelationLocks->mutex);
3119                 Assert(FastPathStrongRelationLocks->count[fasthashcode] > 0);
3120                 FastPathStrongRelationLocks->count[fasthashcode]--;
3121                 SpinLockRelease(&FastPathStrongRelationLocks->mutex);
3122         }
3123 }
3124
3125 /*
3126  * AtPrepare_Locks
3127  *              Do the preparatory work for a PREPARE: make 2PC state file records
3128  *              for all locks currently held.
3129  *
3130  * Session-level locks are ignored, as are VXID locks.
3131  *
3132  * There are some special cases that we error out on: we can't be holding any
3133  * locks at both session and transaction level (since we must either keep or
3134  * give away the PROCLOCK object), and we can't be holding any locks on
3135  * temporary objects (since that would mess up the current backend if it tries
3136  * to exit before the prepared xact is committed).
3137  */
3138 void
3139 AtPrepare_Locks(void)
3140 {
3141         HASH_SEQ_STATUS status;
3142         LOCALLOCK  *locallock;
3143
3144         /*
3145          * For the most part, we don't need to touch shared memory for this ---
3146          * all the necessary state information is in the locallock table.
3147          * Fast-path locks are an exception, however: we move any such locks to
3148          * the main table before allowing PREPARE TRANSACTION to succeed.
3149          */
3150         hash_seq_init(&status, LockMethodLocalHash);
3151
3152         while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
3153         {
3154                 TwoPhaseLockRecord record;
3155                 LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
3156                 bool            haveSessionLock;
3157                 bool            haveXactLock;
3158                 int                     i;
3159
3160                 /*
3161                  * Ignore VXID locks.  We don't want those to be held by prepared
3162                  * transactions, since they aren't meaningful after a restart.
3163                  */
3164                 if (locallock->tag.lock.locktag_type == LOCKTAG_VIRTUALTRANSACTION)
3165                         continue;
3166
3167                 /* Ignore it if we don't actually hold the lock */
3168                 if (locallock->nLocks <= 0)
3169                         continue;
3170
3171                 /* Scan to see whether we hold it at session or transaction level */
3172                 haveSessionLock = haveXactLock = false;
3173                 for (i = locallock->numLockOwners - 1; i >= 0; i--)
3174                 {
3175                         if (lockOwners[i].owner == NULL)
3176                                 haveSessionLock = true;
3177                         else
3178                                 haveXactLock = true;
3179                 }
3180
3181                 /* Ignore it if we have only session lock */
3182                 if (!haveXactLock)
3183                         continue;
3184
3185                 /*
3186                  * If we have both session- and transaction-level locks, fail.  This
3187                  * should never happen with regular locks, since we only take those at
3188                  * session level in some special operations like VACUUM.  It's
3189                  * possible to hit this with advisory locks, though.
3190                  *
3191                  * It would be nice if we could keep the session hold and give away
3192                  * the transactional hold to the prepared xact.  However, that would
3193                  * require two PROCLOCK objects, and we cannot be sure that another
3194                  * PROCLOCK will be available when it comes time for PostPrepare_Locks
3195                  * to do the deed.  So for now, we error out while we can still do so
3196                  * safely.
3197                  */
3198                 if (haveSessionLock)
3199                         ereport(ERROR,
3200                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3201                                          errmsg("cannot PREPARE while holding both session-level and transaction-level locks on the same object")));
3202
3203                 /*
3204                  * If the local lock was taken via the fast-path, we need to move it
3205                  * to the primary lock table, or just get a pointer to the existing
3206                  * primary lock table entry if by chance it's already been
3207                  * transferred.
3208                  */
3209                 if (locallock->proclock == NULL)
3210                 {
3211                         locallock->proclock = FastPathGetRelationLockEntry(locallock);
3212                         locallock->lock = locallock->proclock->tag.myLock;
3213                 }
3214
3215                 /*
3216                  * Arrange to not release any strong lock count held by this lock
3217                  * entry.  We must retain the count until the prepared transaction is
3218                  * committed or rolled back.
3219                  */
3220                 locallock->holdsStrongLockCount = false;
3221
3222                 /*
3223                  * Create a 2PC record.
3224                  */
3225                 memcpy(&(record.locktag), &(locallock->tag.lock), sizeof(LOCKTAG));
3226                 record.lockmode = locallock->tag.mode;
3227
3228                 RegisterTwoPhaseRecord(TWOPHASE_RM_LOCK_ID, 0,
3229                                                            &record, sizeof(TwoPhaseLockRecord));
3230         }
3231 }
3232
3233 /*
3234  * PostPrepare_Locks
3235  *              Clean up after successful PREPARE
3236  *
3237  * Here, we want to transfer ownership of our locks to a dummy PGPROC
3238  * that's now associated with the prepared transaction, and we want to
3239  * clean out the corresponding entries in the LOCALLOCK table.
3240  *
3241  * Note: by removing the LOCALLOCK entries, we are leaving dangling
3242  * pointers in the transaction's resource owner.  This is OK at the
3243  * moment since resowner.c doesn't try to free locks retail at a toplevel
3244  * transaction commit or abort.  We could alternatively zero out nLocks
3245  * and leave the LOCALLOCK entries to be garbage-collected by LockReleaseAll,
3246  * but that probably costs more cycles.
3247  */
3248 void
3249 PostPrepare_Locks(TransactionId xid)
3250 {
3251         PGPROC     *newproc = TwoPhaseGetDummyProc(xid, false);
3252         HASH_SEQ_STATUS status;
3253         LOCALLOCK  *locallock;
3254         LOCK       *lock;
3255         PROCLOCK   *proclock;
3256         PROCLOCKTAG proclocktag;
3257         int                     partition;
3258
3259         /* Can't prepare a lock group follower. */
3260         Assert(MyProc->lockGroupLeader == NULL ||
3261                    MyProc->lockGroupLeader == MyProc);
3262
3263         /* This is a critical section: any error means big trouble */
3264         START_CRIT_SECTION();
3265
3266         /*
3267          * First we run through the locallock table and get rid of unwanted
3268          * entries, then we scan the process's proclocks and transfer them to the
3269          * target proc.
3270          *
3271          * We do this separately because we may have multiple locallock entries
3272          * pointing to the same proclock, and we daren't end up with any dangling
3273          * pointers.
3274          */
3275         hash_seq_init(&status, LockMethodLocalHash);
3276
3277         while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL)
3278         {
3279                 LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
3280                 bool            haveSessionLock;
3281                 bool            haveXactLock;
3282                 int                     i;
3283
3284                 if (locallock->proclock == NULL || locallock->lock == NULL)
3285                 {
3286                         /*
3287                          * We must've run out of shared memory while trying to set up this
3288                          * lock.  Just forget the local entry.
3289                          */
3290                         Assert(locallock->nLocks == 0);
3291                         RemoveLocalLock(locallock);
3292                         continue;
3293                 }
3294
3295                 /* Ignore VXID locks */
3296                 if (locallock->tag.lock.locktag_type == LOCKTAG_VIRTUALTRANSACTION)
3297                         continue;
3298
3299                 /* Scan to see whether we hold it at session or transaction level */
3300                 haveSessionLock = haveXactLock = false;
3301                 for (i = locallock->numLockOwners - 1; i >= 0; i--)
3302                 {
3303                         if (lockOwners[i].owner == NULL)
3304                                 haveSessionLock = true;
3305                         else
3306                                 haveXactLock = true;
3307                 }
3308
3309                 /* Ignore it if we have only session lock */
3310                 if (!haveXactLock)
3311                         continue;
3312
3313                 /* This can't happen, because we already checked it */
3314                 if (haveSessionLock)
3315                         ereport(PANIC,
3316                                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
3317                                          errmsg("cannot PREPARE while holding both session-level and transaction-level locks on the same object")));
3318
3319                 /* Mark the proclock to show we need to release this lockmode */
3320                 if (locallock->nLocks > 0)
3321                         locallock->proclock->releaseMask |= LOCKBIT_ON(locallock->tag.mode);
3322
3323                 /* And remove the locallock hashtable entry */
3324                 RemoveLocalLock(locallock);
3325         }
3326
3327         /*
3328          * Now, scan each lock partition separately.
3329          */
3330         for (partition = 0; partition < NUM_LOCK_PARTITIONS; partition++)
3331         {
3332                 LWLock     *partitionLock;
3333                 SHM_QUEUE  *procLocks = &(MyProc->myProcLocks[partition]);
3334                 PROCLOCK   *nextplock;
3335
3336                 partitionLock = LockHashPartitionLockByIndex(partition);
3337
3338                 /*
3339                  * If the proclock list for this partition is empty, we can skip
3340                  * acquiring the partition lock.  This optimization is safer than the
3341                  * situation in LockReleaseAll, because we got rid of any fast-path
3342                  * locks during AtPrepare_Locks, so there cannot be any case where
3343                  * another backend is adding something to our lists now.  For safety,
3344                  * though, we code this the same way as in LockReleaseAll.
3345                  */
3346                 if (SHMQueueNext(procLocks, procLocks,
3347                                                  offsetof(PROCLOCK, procLink)) == NULL)
3348                         continue;                       /* needn't examine this partition */
3349
3350                 LWLockAcquire(partitionLock, LW_EXCLUSIVE);
3351
3352                 for (proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
3353                                                                                                   offsetof(PROCLOCK, procLink));
3354                          proclock;
3355                          proclock = nextplock)
3356                 {
3357                         /* Get link first, since we may unlink/relink this proclock */
3358                         nextplock = (PROCLOCK *)
3359                                 SHMQueueNext(procLocks, &proclock->procLink,
3360                                                          offsetof(PROCLOCK, procLink));
3361
3362                         Assert(proclock->tag.myProc == MyProc);
3363
3364                         lock = proclock->tag.myLock;
3365
3366                         /* Ignore VXID locks */
3367                         if (lock->tag.locktag_type == LOCKTAG_VIRTUALTRANSACTION)
3368                                 continue;
3369
3370                         PROCLOCK_PRINT("PostPrepare_Locks", proclock);
3371                         LOCK_PRINT("PostPrepare_Locks", lock, 0);
3372                         Assert(lock->nRequested >= 0);
3373                         Assert(lock->nGranted >= 0);
3374                         Assert(lock->nGranted <= lock->nRequested);
3375                         Assert((proclock->holdMask & ~lock->grantMask) == 0);
3376
3377                         /* Ignore it if nothing to release (must be a session lock) */
3378                         if (proclock->releaseMask == 0)
3379                                 continue;
3380
3381                         /* Else we should be releasing all locks */
3382                         if (proclock->releaseMask != proclock->holdMask)
3383                                 elog(PANIC, "we seem to have dropped a bit somewhere");
3384
3385                         /*
3386                          * We cannot simply modify proclock->tag.myProc to reassign
3387                          * ownership of the lock, because that's part of the hash key and
3388                          * the proclock would then be in the wrong hash chain.  Instead
3389                          * use hash_update_hash_key.  (We used to create a new hash entry,
3390                          * but that risks out-of-memory failure if other processes are
3391                          * busy making proclocks too.)  We must unlink the proclock from
3392                          * our procLink chain and put it into the new proc's chain, too.
3393                          *
3394                          * Note: the updated proclock hash key will still belong to the
3395                          * same hash partition, cf proclock_hash().  So the partition lock
3396                          * we already hold is sufficient for this.
3397                          */
3398                         SHMQueueDelete(&proclock->procLink);
3399
3400                         /*
3401                          * Create the new hash key for the proclock.
3402                          */
3403                         proclocktag.myLock = lock;
3404                         proclocktag.myProc = newproc;
3405
3406                         /*
3407                          * Update groupLeader pointer to point to the new proc.  (We'd
3408                          * better not be a member of somebody else's lock group!)
3409                          */
3410                         Assert(proclock->groupLeader == proclock->tag.myProc);
3411                         proclock->groupLeader = newproc;
3412
3413                         /*
3414                          * Update the proclock.  We should not find any existing entry for
3415                          * the same hash key, since there can be only one entry for any
3416                          * given lock with my own proc.
3417                          */
3418                         if (!hash_update_hash_key(LockMethodProcLockHash,
3419                                                                           (void *) proclock,
3420                                                                           (void *) &proclocktag))
3421                                 elog(PANIC, "duplicate entry found while reassigning a prepared transaction's locks");
3422
3423                         /* Re-link into the new proc's proclock list */
3424                         SHMQueueInsertBefore(&(newproc->myProcLocks[partition]),
3425                                                                  &proclock->procLink);
3426
3427                         PROCLOCK_PRINT("PostPrepare_Locks: updated", proclock);
3428                 }                                               /* loop over PROCLOCKs within this partition */
3429
3430                 LWLockRelease(partitionLock);
3431         }                                                       /* loop over partitions */
3432
3433         END_CRIT_SECTION();
3434 }
3435
3436
3437 /*
3438  * Estimate shared-memory space used for lock tables
3439  */
3440 Size
3441 LockShmemSize(void)
3442 {
3443         Size            size = 0;
3444         long            max_table_size;
3445
3446         /* lock hash table */
3447         max_table_size = NLOCKENTS();
3448         size = add_size(size, hash_estimate_size(max_table_size, sizeof(LOCK)));
3449
3450         /* proclock hash table */
3451         max_table_size *= 2;
3452         size = add_size(size, hash_estimate_size(max_table_size, sizeof(PROCLOCK)));
3453
3454         /*
3455          * Since NLOCKENTS is only an estimate, add 10% safety margin.
3456          */
3457         size = add_size(size, size / 10);
3458
3459         return size;
3460 }
3461
3462 /*
3463  * GetLockStatusData - Return a summary of the lock manager's internal
3464  * status, for use in a user-level reporting function.
3465  *
3466  * The return data consists of an array of LockInstanceData objects,
3467  * which are a lightly abstracted version of the PROCLOCK data structures,
3468  * i.e. there is one entry for each unique lock and interested PGPROC.
3469  * It is the caller's responsibility to match up related items (such as
3470  * references to the same lockable object or PGPROC) if wanted.
3471  *
3472  * The design goal is to hold the LWLocks for as short a time as possible;
3473  * thus, this function simply makes a copy of the necessary data and releases
3474  * the locks, allowing the caller to contemplate and format the data for as
3475  * long as it pleases.
3476  */
3477 LockData *
3478 GetLockStatusData(void)
3479 {
3480         LockData   *data;
3481         PROCLOCK   *proclock;
3482         HASH_SEQ_STATUS seqstat;
3483         int                     els;
3484         int                     el;
3485         int                     i;
3486
3487         data = (LockData *) palloc(sizeof(LockData));
3488
3489         /* Guess how much space we'll need. */
3490         els = MaxBackends;
3491         el = 0;
3492         data->locks = (LockInstanceData *) palloc(sizeof(LockInstanceData) * els);
3493
3494         /*
3495          * First, we iterate through the per-backend fast-path arrays, locking
3496          * them one at a time.  This might produce an inconsistent picture of the
3497          * system state, but taking all of those LWLocks at the same time seems
3498          * impractical (in particular, note MAX_SIMUL_LWLOCKS).  It shouldn't
3499          * matter too much, because none of these locks can be involved in lock
3500          * conflicts anyway - anything that might must be present in the main lock
3501          * table.  (For the same reason, we don't sweat about making leaderPid
3502          * completely valid.  We cannot safely dereference another backend's
3503          * lockGroupLeader field without holding all lock partition locks, and
3504          * it's not worth that.)
3505          */
3506         for (i = 0; i < ProcGlobal->allProcCount; ++i)
3507         {
3508                 PGPROC     *proc = &ProcGlobal->allProcs[i];
3509                 uint32          f;
3510
3511                 LWLockAcquire(&proc->backendLock, LW_SHARED);
3512
3513                 for (f = 0; f < FP_LOCK_SLOTS_PER_BACKEND; ++f)
3514                 {
3515                         LockInstanceData *instance;
3516                         uint32          lockbits = FAST_PATH_GET_BITS(proc, f);
3517
3518                         /* Skip unallocated slots. */
3519                         if (!lockbits)
3520                                 continue;
3521
3522                         if (el >= els)
3523                         {
3524                                 els += MaxBackends;
3525                                 data->locks = (LockInstanceData *)
3526                                         repalloc(data->locks, sizeof(LockInstanceData) * els);
3527                         }
3528
3529                         instance = &data->locks[el];
3530                         SET_LOCKTAG_RELATION(instance->locktag, proc->databaseId,
3531                                                                  proc->fpRelId[f]);
3532                         instance->holdMask = lockbits << FAST_PATH_LOCKNUMBER_OFFSET;
3533                         instance->waitLockMode = NoLock;
3534                         instance->backend = proc->backendId;
3535                         instance->lxid = proc->lxid;
3536                         instance->pid = proc->pid;
3537                         instance->leaderPid = proc->pid;
3538                         instance->fastpath = true;
3539
3540                         el++;
3541                 }
3542
3543                 if (proc->fpVXIDLock)
3544                 {
3545                         VirtualTransactionId vxid;
3546                         LockInstanceData *instance;
3547
3548                         if (el >= els)
3549                         {
3550                                 els += MaxBackends;
3551                                 data->locks = (LockInstanceData *)
3552                                         repalloc(data->locks, sizeof(LockInstanceData) * els);
3553                         }
3554
3555                         vxid.backendId = proc->backendId;
3556                         vxid.localTransactionId = proc->fpLocalTransactionId;
3557
3558                         instance = &data->locks[el];
3559                         SET_LOCKTAG_VIRTUALTRANSACTION(instance->locktag, vxid);
3560                         instance->holdMask = LOCKBIT_ON(ExclusiveLock);
3561                         instance->waitLockMode = NoLock;
3562                         instance->backend = proc->backendId;
3563                         instance->lxid = proc->lxid;
3564                         instance->pid = proc->pid;
3565                         instance->leaderPid = proc->pid;
3566                         instance->fastpath = true;
3567
3568                         el++;
3569                 }
3570
3571                 LWLockRelease(&proc->backendLock);
3572         }
3573
3574         /*
3575          * Next, acquire lock on the entire shared lock data structure.  We do
3576          * this so that, at least for locks in the primary lock table, the state
3577          * will be self-consistent.
3578          *
3579          * Since this is a read-only operation, we take shared instead of
3580          * exclusive lock.  There's not a whole lot of point to this, because all
3581          * the normal operations require exclusive lock, but it doesn't hurt
3582          * anything either. It will at least allow two backends to do
3583          * GetLockStatusData in parallel.
3584          *
3585          * Must grab LWLocks in partition-number order to avoid LWLock deadlock.
3586          */
3587         for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
3588                 LWLockAcquire(LockHashPartitionLockByIndex(i), LW_SHARED);
3589
3590         /* Now we can safely count the number of proclocks */
3591         data->nelements = el + hash_get_num_entries(LockMethodProcLockHash);
3592         if (data->nelements > els)
3593         {
3594                 els = data->nelements;
3595                 data->locks = (LockInstanceData *)
3596                         repalloc(data->locks, sizeof(LockInstanceData) * els);
3597         }
3598
3599         /* Now scan the tables to copy the data */
3600         hash_seq_init(&seqstat, LockMethodProcLockHash);
3601
3602         while ((proclock = (PROCLOCK *) hash_seq_search(&seqstat)))
3603         {
3604                 PGPROC     *proc = proclock->tag.myProc;
3605                 LOCK       *lock = proclock->tag.myLock;
3606                 LockInstanceData *instance = &data->locks[el];
3607
3608                 memcpy(&instance->locktag, &lock->tag, sizeof(LOCKTAG));
3609                 instance->holdMask = proclock->holdMask;
3610                 if (proc->waitLock == proclock->tag.myLock)
3611                         instance->waitLockMode = proc->waitLockMode;
3612                 else
3613                         instance->waitLockMode = NoLock;
3614                 instance->backend = proc->backendId;
3615                 instance->lxid = proc->lxid;
3616                 instance->pid = proc->pid;
3617                 instance->leaderPid = proclock->groupLeader->pid;
3618                 instance->fastpath = false;
3619
3620                 el++;
3621         }
3622
3623         /*
3624          * And release locks.  We do this in reverse order for two reasons: (1)
3625          * Anyone else who needs more than one of the locks will be trying to lock
3626          * them in increasing order; we don't want to release the other process
3627          * until it can get all the locks it needs. (2) This avoids O(N^2)
3628          * behavior inside LWLockRelease.
3629          */
3630         for (i = NUM_LOCK_PARTITIONS; --i >= 0;)
3631                 LWLockRelease(LockHashPartitionLockByIndex(i));
3632
3633         Assert(el == data->nelements);
3634
3635         return data;
3636 }
3637
3638 /*
3639  * GetBlockerStatusData - Return a summary of the lock manager's state
3640  * concerning locks that are blocking the specified PID or any member of
3641  * the PID's lock group, for use in a user-level reporting function.
3642  *
3643  * For each PID within the lock group that is awaiting some heavyweight lock,
3644  * the return data includes an array of LockInstanceData objects, which are
3645  * the same data structure used by GetLockStatusData; but unlike that function,
3646  * this one reports only the PROCLOCKs associated with the lock that that PID
3647  * is blocked on.  (Hence, all the locktags should be the same for any one
3648  * blocked PID.)  In addition, we return an array of the PIDs of those backends
3649  * that are ahead of the blocked PID in the lock's wait queue.  These can be
3650  * compared with the PIDs in the LockInstanceData objects to determine which
3651  * waiters are ahead of or behind the blocked PID in the queue.
3652  *
3653  * If blocked_pid isn't a valid backend PID or nothing in its lock group is
3654  * waiting on any heavyweight lock, return empty arrays.
3655  *
3656  * The design goal is to hold the LWLocks for as short a time as possible;
3657  * thus, this function simply makes a copy of the necessary data and releases
3658  * the locks, allowing the caller to contemplate and format the data for as
3659  * long as it pleases.
3660  */
3661 BlockedProcsData *
3662 GetBlockerStatusData(int blocked_pid)
3663 {
3664         BlockedProcsData *data;
3665         PGPROC     *proc;
3666         int                     i;
3667
3668         data = (BlockedProcsData *) palloc(sizeof(BlockedProcsData));
3669
3670         /*
3671          * Guess how much space we'll need, and preallocate.  Most of the time
3672          * this will avoid needing to do repalloc while holding the LWLocks.  (We
3673          * assume, but check with an Assert, that MaxBackends is enough entries
3674          * for the procs[] array; the other two could need enlargement, though.)
3675          */
3676         data->nprocs = data->nlocks = data->npids = 0;
3677         data->maxprocs = data->maxlocks = data->maxpids = MaxBackends;
3678         data->procs = (BlockedProcData *) palloc(sizeof(BlockedProcData) * data->maxprocs);
3679         data->locks = (LockInstanceData *) palloc(sizeof(LockInstanceData) * data->maxlocks);
3680         data->waiter_pids = (int *) palloc(sizeof(int) * data->maxpids);
3681
3682         /*
3683          * In order to search the ProcArray for blocked_pid and assume that that
3684          * entry won't immediately disappear under us, we must hold ProcArrayLock.
3685          * In addition, to examine the lock grouping fields of any other backend,
3686          * we must hold all the hash partition locks.  (Only one of those locks is
3687          * actually relevant for any one lock group, but we can't know which one
3688          * ahead of time.)      It's fairly annoying to hold all those locks
3689          * throughout this, but it's no worse than GetLockStatusData(), and it
3690          * does have the advantage that we're guaranteed to return a
3691          * self-consistent instantaneous state.
3692          */
3693         LWLockAcquire(ProcArrayLock, LW_SHARED);
3694
3695         proc = BackendPidGetProcWithLock(blocked_pid);
3696
3697         /* Nothing to do if it's gone */
3698         if (proc != NULL)
3699         {
3700                 /*
3701                  * Acquire lock on the entire shared lock data structure.  See notes
3702                  * in GetLockStatusData().
3703                  */
3704                 for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
3705                         LWLockAcquire(LockHashPartitionLockByIndex(i), LW_SHARED);
3706
3707                 if (proc->lockGroupLeader == NULL)
3708                 {
3709                         /* Easy case, proc is not a lock group member */
3710                         GetSingleProcBlockerStatusData(proc, data);
3711                 }
3712                 else
3713                 {
3714                         /* Examine all procs in proc's lock group */
3715                         dlist_iter      iter;
3716
3717                         dlist_foreach(iter, &proc->lockGroupLeader->lockGroupMembers)
3718                         {
3719                                 PGPROC     *memberProc;
3720
3721                                 memberProc = dlist_container(PGPROC, lockGroupLink, iter.cur);
3722                                 GetSingleProcBlockerStatusData(memberProc, data);
3723                         }
3724                 }
3725
3726                 /*
3727                  * And release locks.  See notes in GetLockStatusData().
3728                  */
3729                 for (i = NUM_LOCK_PARTITIONS; --i >= 0;)
3730                         LWLockRelease(LockHashPartitionLockByIndex(i));
3731
3732                 Assert(data->nprocs <= data->maxprocs);
3733         }
3734
3735         LWLockRelease(ProcArrayLock);
3736
3737         return data;
3738 }
3739
3740 /* Accumulate data about one possibly-blocked proc for GetBlockerStatusData */
3741 static void
3742 GetSingleProcBlockerStatusData(PGPROC *blocked_proc, BlockedProcsData *data)
3743 {
3744         LOCK       *theLock = blocked_proc->waitLock;
3745         BlockedProcData *bproc;
3746         SHM_QUEUE  *procLocks;
3747         PROCLOCK   *proclock;
3748         PROC_QUEUE *waitQueue;
3749         PGPROC     *proc;
3750         int                     queue_size;
3751         int                     i;
3752
3753         /* Nothing to do if this proc is not blocked */
3754         if (theLock == NULL)
3755                 return;
3756
3757         /* Set up a procs[] element */
3758         bproc = &data->procs[data->nprocs++];
3759         bproc->pid = blocked_proc->pid;
3760         bproc->first_lock = data->nlocks;
3761         bproc->first_waiter = data->npids;
3762
3763         /*
3764          * We may ignore the proc's fast-path arrays, since nothing in those could
3765          * be related to a contended lock.
3766          */
3767
3768         /* Collect all PROCLOCKs associated with theLock */
3769         procLocks = &(theLock->procLocks);
3770         proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
3771                                                                                  offsetof(PROCLOCK, lockLink));
3772         while (proclock)
3773         {
3774                 PGPROC     *proc = proclock->tag.myProc;
3775                 LOCK       *lock = proclock->tag.myLock;
3776                 LockInstanceData *instance;
3777
3778                 if (data->nlocks >= data->maxlocks)
3779                 {
3780                         data->maxlocks += MaxBackends;
3781                         data->locks = (LockInstanceData *)
3782                                 repalloc(data->locks, sizeof(LockInstanceData) * data->maxlocks);
3783                 }
3784
3785                 instance = &data->locks[data->nlocks];
3786                 memcpy(&instance->locktag, &lock->tag, sizeof(LOCKTAG));
3787                 instance->holdMask = proclock->holdMask;
3788                 if (proc->waitLock == lock)
3789                         instance->waitLockMode = proc->waitLockMode;
3790                 else
3791                         instance->waitLockMode = NoLock;
3792                 instance->backend = proc->backendId;
3793                 instance->lxid = proc->lxid;
3794                 instance->pid = proc->pid;
3795                 instance->leaderPid = proclock->groupLeader->pid;
3796                 instance->fastpath = false;
3797                 data->nlocks++;
3798
3799                 proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->lockLink,
3800                                                                                          offsetof(PROCLOCK, lockLink));
3801         }
3802
3803         /* Enlarge waiter_pids[] if it's too small to hold all wait queue PIDs */
3804         waitQueue = &(theLock->waitProcs);
3805         queue_size = waitQueue->size;
3806
3807         if (queue_size > data->maxpids - data->npids)
3808         {
3809                 data->maxpids = Max(data->maxpids + MaxBackends,
3810                                                         data->npids + queue_size);
3811                 data->waiter_pids = (int *) repalloc(data->waiter_pids,
3812                                                                                          sizeof(int) * data->maxpids);
3813         }
3814
3815         /* Collect PIDs from the lock's wait queue, stopping at blocked_proc */
3816         proc = (PGPROC *) waitQueue->links.next;
3817         for (i = 0; i < queue_size; i++)
3818         {
3819                 if (proc == blocked_proc)
3820                         break;
3821                 data->waiter_pids[data->npids++] = proc->pid;
3822                 proc = (PGPROC *) proc->links.next;
3823         }
3824
3825         bproc->num_locks = data->nlocks - bproc->first_lock;
3826         bproc->num_waiters = data->npids - bproc->first_waiter;
3827 }
3828
3829 /*
3830  * Returns a list of currently held AccessExclusiveLocks, for use by
3831  * LogStandbySnapshot().  The result is a palloc'd array,
3832  * with the number of elements returned into *nlocks.
3833  *
3834  * XXX This currently takes a lock on all partitions of the lock table,
3835  * but it's possible to do better.  By reference counting locks and storing
3836  * the value in the ProcArray entry for each backend we could tell if any
3837  * locks need recording without having to acquire the partition locks and
3838  * scan the lock table.  Whether that's worth the additional overhead
3839  * is pretty dubious though.
3840  */
3841 xl_standby_lock *
3842 GetRunningTransactionLocks(int *nlocks)
3843 {
3844         xl_standby_lock *accessExclusiveLocks;
3845         PROCLOCK   *proclock;
3846         HASH_SEQ_STATUS seqstat;
3847         int                     i;
3848         int                     index;
3849         int                     els;
3850
3851         /*
3852          * Acquire lock on the entire shared lock data structure.
3853          *
3854          * Must grab LWLocks in partition-number order to avoid LWLock deadlock.
3855          */
3856         for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
3857                 LWLockAcquire(LockHashPartitionLockByIndex(i), LW_SHARED);
3858
3859         /* Now we can safely count the number of proclocks */
3860         els = hash_get_num_entries(LockMethodProcLockHash);
3861
3862         /*
3863          * Allocating enough space for all locks in the lock table is overkill,
3864          * but it's more convenient and faster than having to enlarge the array.
3865          */
3866         accessExclusiveLocks = palloc(els * sizeof(xl_standby_lock));
3867
3868         /* Now scan the tables to copy the data */
3869         hash_seq_init(&seqstat, LockMethodProcLockHash);
3870
3871         /*
3872          * If lock is a currently granted AccessExclusiveLock then it will have
3873          * just one proclock holder, so locks are never accessed twice in this
3874          * particular case. Don't copy this code for use elsewhere because in the
3875          * general case this will give you duplicate locks when looking at
3876          * non-exclusive lock types.
3877          */
3878         index = 0;
3879         while ((proclock = (PROCLOCK *) hash_seq_search(&seqstat)))
3880         {
3881                 /* make sure this definition matches the one used in LockAcquire */
3882                 if ((proclock->holdMask & LOCKBIT_ON(AccessExclusiveLock)) &&
3883                         proclock->tag.myLock->tag.locktag_type == LOCKTAG_RELATION)
3884                 {
3885                         PGPROC     *proc = proclock->tag.myProc;
3886                         PGXACT     *pgxact = &ProcGlobal->allPgXact[proc->pgprocno];
3887                         LOCK       *lock = proclock->tag.myLock;
3888                         TransactionId xid = pgxact->xid;
3889
3890                         /*
3891                          * Don't record locks for transactions if we know they have
3892                          * already issued their WAL record for commit but not yet released
3893                          * lock. It is still possible that we see locks held by already
3894                          * complete transactions, if they haven't yet zeroed their xids.
3895                          */
3896                         if (!TransactionIdIsValid(xid))
3897                                 continue;
3898
3899                         accessExclusiveLocks[index].xid = xid;
3900                         accessExclusiveLocks[index].dbOid = lock->tag.locktag_field1;
3901                         accessExclusiveLocks[index].relOid = lock->tag.locktag_field2;
3902
3903                         index++;
3904                 }
3905         }
3906
3907         Assert(index <= els);
3908
3909         /*
3910          * And release locks.  We do this in reverse order for two reasons: (1)
3911          * Anyone else who needs more than one of the locks will be trying to lock
3912          * them in increasing order; we don't want to release the other process
3913          * until it can get all the locks it needs. (2) This avoids O(N^2)
3914          * behavior inside LWLockRelease.
3915          */
3916         for (i = NUM_LOCK_PARTITIONS; --i >= 0;)
3917                 LWLockRelease(LockHashPartitionLockByIndex(i));
3918
3919         *nlocks = index;
3920         return accessExclusiveLocks;
3921 }
3922
3923 /* Provide the textual name of any lock mode */
3924 const char *
3925 GetLockmodeName(LOCKMETHODID lockmethodid, LOCKMODE mode)
3926 {
3927         Assert(lockmethodid > 0 && lockmethodid < lengthof(LockMethods));
3928         Assert(mode > 0 && mode <= LockMethods[lockmethodid]->numLockModes);
3929         return LockMethods[lockmethodid]->lockModeNames[mode];
3930 }
3931
3932 #ifdef LOCK_DEBUG
3933 /*
3934  * Dump all locks in the given proc's myProcLocks lists.
3935  *
3936  * Caller is responsible for having acquired appropriate LWLocks.
3937  */
3938 void
3939 DumpLocks(PGPROC *proc)
3940 {
3941         SHM_QUEUE  *procLocks;
3942         PROCLOCK   *proclock;
3943         LOCK       *lock;
3944         int                     i;
3945
3946         if (proc == NULL)
3947                 return;
3948
3949         if (proc->waitLock)
3950                 LOCK_PRINT("DumpLocks: waiting on", proc->waitLock, 0);
3951
3952         for (i = 0; i < NUM_LOCK_PARTITIONS; i++)
3953         {
3954                 procLocks = &(proc->myProcLocks[i]);
3955
3956                 proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
3957                                                                                          offsetof(PROCLOCK, procLink));
3958
3959                 while (proclock)
3960                 {
3961                         Assert(proclock->tag.myProc == proc);
3962
3963                         lock = proclock->tag.myLock;
3964
3965                         PROCLOCK_PRINT("DumpLocks", proclock);
3966                         LOCK_PRINT("DumpLocks", lock, 0);
3967
3968                         proclock = (PROCLOCK *)
3969                                 SHMQueueNext(procLocks, &proclock->procLink,
3970                                                          offsetof(PROCLOCK, procLink));
3971                 }
3972         }
3973 }
3974
3975 /*
3976  * Dump all lmgr locks.
3977  *
3978  * Caller is responsible for having acquired appropriate LWLocks.
3979  */
3980 void
3981 DumpAllLocks(void)
3982 {
3983         PGPROC     *proc;
3984         PROCLOCK   *proclock;
3985         LOCK       *lock;
3986         HASH_SEQ_STATUS status;
3987
3988         proc = MyProc;
3989
3990         if (proc && proc->waitLock)
3991                 LOCK_PRINT("DumpAllLocks: waiting on", proc->waitLock, 0);
3992
3993         hash_seq_init(&status, LockMethodProcLockHash);
3994
3995         while ((proclock = (PROCLOCK *) hash_seq_search(&status)) != NULL)
3996         {
3997                 PROCLOCK_PRINT("DumpAllLocks", proclock);
3998
3999                 lock = proclock->tag.myLock;
4000                 if (lock)
4001                         LOCK_PRINT("DumpAllLocks", lock, 0);
4002                 else
4003                         elog(LOG, "DumpAllLocks: proclock->tag.myLock = NULL");
4004         }
4005 }
4006 #endif                                                  /* LOCK_DEBUG */
4007
4008 /*
4009  * LOCK 2PC resource manager's routines
4010  */
4011
4012 /*
4013  * Re-acquire a lock belonging to a transaction that was prepared.
4014  *
4015  * Because this function is run at db startup, re-acquiring the locks should
4016  * never conflict with running transactions because there are none.  We
4017  * assume that the lock state represented by the stored 2PC files is legal.
4018  *
4019  * When switching from Hot Standby mode to normal operation, the locks will
4020  * be already held by the startup process. The locks are acquired for the new
4021  * procs without checking for conflicts, so we don't get a conflict between the
4022  * startup process and the dummy procs, even though we will momentarily have
4023  * a situation where two procs are holding the same AccessExclusiveLock,
4024  * which isn't normally possible because the conflict. If we're in standby
4025  * mode, but a recovery snapshot hasn't been established yet, it's possible
4026  * that some but not all of the locks are already held by the startup process.
4027  *
4028  * This approach is simple, but also a bit dangerous, because if there isn't
4029  * enough shared memory to acquire the locks, an error will be thrown, which
4030  * is promoted to FATAL and recovery will abort, bringing down postmaster.
4031  * A safer approach would be to transfer the locks like we do in
4032  * AtPrepare_Locks, but then again, in hot standby mode it's possible for
4033  * read-only backends to use up all the shared lock memory anyway, so that
4034  * replaying the WAL record that needs to acquire a lock will throw an error
4035  * and PANIC anyway.
4036  */
4037 void
4038 lock_twophase_recover(TransactionId xid, uint16 info,
4039                                           void *recdata, uint32 len)
4040 {
4041         TwoPhaseLockRecord *rec = (TwoPhaseLockRecord *) recdata;
4042         PGPROC     *proc = TwoPhaseGetDummyProc(xid, false);
4043         LOCKTAG    *locktag;
4044         LOCKMODE        lockmode;
4045         LOCKMETHODID lockmethodid;
4046         LOCK       *lock;
4047         PROCLOCK   *proclock;
4048         PROCLOCKTAG proclocktag;
4049         bool            found;
4050         uint32          hashcode;
4051         uint32          proclock_hashcode;
4052         int                     partition;
4053         LWLock     *partitionLock;
4054         LockMethod      lockMethodTable;
4055
4056         Assert(len == sizeof(TwoPhaseLockRecord));
4057         locktag = &rec->locktag;
4058         lockmode = rec->lockmode;
4059         lockmethodid = locktag->locktag_lockmethodid;
4060
4061         if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
4062                 elog(ERROR, "unrecognized lock method: %d", lockmethodid);
4063         lockMethodTable = LockMethods[lockmethodid];
4064
4065         hashcode = LockTagHashCode(locktag);
4066         partition = LockHashPartition(hashcode);
4067         partitionLock = LockHashPartitionLock(hashcode);
4068
4069         LWLockAcquire(partitionLock, LW_EXCLUSIVE);
4070
4071         /*
4072          * Find or create a lock with this tag.
4073          */
4074         lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
4075                                                                                                 (void *) locktag,
4076                                                                                                 hashcode,
4077                                                                                                 HASH_ENTER_NULL,
4078                                                                                                 &found);
4079         if (!lock)
4080         {
4081                 LWLockRelease(partitionLock);
4082                 ereport(ERROR,
4083                                 (errcode(ERRCODE_OUT_OF_MEMORY),
4084                                  errmsg("out of shared memory"),
4085                                  errhint("You might need to increase max_locks_per_transaction.")));
4086         }
4087
4088         /*
4089          * if it's a new lock object, initialize it
4090          */
4091         if (!found)
4092         {
4093                 lock->grantMask = 0;
4094                 lock->waitMask = 0;
4095                 SHMQueueInit(&(lock->procLocks));
4096                 ProcQueueInit(&(lock->waitProcs));
4097                 lock->nRequested = 0;
4098                 lock->nGranted = 0;
4099                 MemSet(lock->requested, 0, sizeof(int) * MAX_LOCKMODES);
4100                 MemSet(lock->granted, 0, sizeof(int) * MAX_LOCKMODES);
4101                 LOCK_PRINT("lock_twophase_recover: new", lock, lockmode);
4102         }
4103         else
4104         {
4105                 LOCK_PRINT("lock_twophase_recover: found", lock, lockmode);
4106                 Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
4107                 Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
4108                 Assert(lock->nGranted <= lock->nRequested);
4109         }
4110
4111         /*
4112          * Create the hash key for the proclock table.
4113          */
4114         proclocktag.myLock = lock;
4115         proclocktag.myProc = proc;
4116
4117         proclock_hashcode = ProcLockHashCode(&proclocktag, hashcode);
4118
4119         /*
4120          * Find or create a proclock entry with this tag
4121          */
4122         proclock = (PROCLOCK *) hash_search_with_hash_value(LockMethodProcLockHash,
4123                                                                                                                 (void *) &proclocktag,
4124                                                                                                                 proclock_hashcode,
4125                                                                                                                 HASH_ENTER_NULL,
4126                                                                                                                 &found);
4127         if (!proclock)
4128         {
4129                 /* Oops, not enough shmem for the proclock */
4130                 if (lock->nRequested == 0)
4131                 {
4132                         /*
4133                          * There are no other requestors of this lock, so garbage-collect
4134                          * the lock object.  We *must* do this to avoid a permanent leak
4135                          * of shared memory, because there won't be anything to cause
4136                          * anyone to release the lock object later.
4137                          */
4138                         Assert(SHMQueueEmpty(&(lock->procLocks)));
4139                         if (!hash_search_with_hash_value(LockMethodLockHash,
4140                                                                                          (void *) &(lock->tag),
4141                                                                                          hashcode,
4142                                                                                          HASH_REMOVE,
4143                                                                                          NULL))
4144                                 elog(PANIC, "lock table corrupted");
4145                 }
4146                 LWLockRelease(partitionLock);
4147                 ereport(ERROR,
4148                                 (errcode(ERRCODE_OUT_OF_MEMORY),
4149                                  errmsg("out of shared memory"),
4150                                  errhint("You might need to increase max_locks_per_transaction.")));
4151         }
4152
4153         /*
4154          * If new, initialize the new entry
4155          */
4156         if (!found)
4157         {
4158                 Assert(proc->lockGroupLeader == NULL);
4159                 proclock->groupLeader = proc;
4160                 proclock->holdMask = 0;
4161                 proclock->releaseMask = 0;
4162                 /* Add proclock to appropriate lists */
4163                 SHMQueueInsertBefore(&lock->procLocks, &proclock->lockLink);
4164                 SHMQueueInsertBefore(&(proc->myProcLocks[partition]),
4165                                                          &proclock->procLink);
4166                 PROCLOCK_PRINT("lock_twophase_recover: new", proclock);
4167         }
4168         else
4169         {
4170                 PROCLOCK_PRINT("lock_twophase_recover: found", proclock);
4171                 Assert((proclock->holdMask & ~lock->grantMask) == 0);
4172         }
4173
4174         /*
4175          * lock->nRequested and lock->requested[] count the total number of
4176          * requests, whether granted or waiting, so increment those immediately.
4177          */
4178         lock->nRequested++;
4179         lock->requested[lockmode]++;
4180         Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
4181
4182         /*
4183          * We shouldn't already hold the desired lock.
4184          */
4185         if (proclock->holdMask & LOCKBIT_ON(lockmode))
4186                 elog(ERROR, "lock %s on object %u/%u/%u is already held",
4187                          lockMethodTable->lockModeNames[lockmode],
4188                          lock->tag.locktag_field1, lock->tag.locktag_field2,
4189                          lock->tag.locktag_field3);
4190
4191         /*
4192          * We ignore any possible conflicts and just grant ourselves the lock. Not
4193          * only because we don't bother, but also to avoid deadlocks when
4194          * switching from standby to normal mode. See function comment.
4195          */
4196         GrantLock(lock, proclock, lockmode);
4197
4198         /*
4199          * Bump strong lock count, to make sure any fast-path lock requests won't
4200          * be granted without consulting the primary lock table.
4201          */
4202         if (ConflictsWithRelationFastPath(&lock->tag, lockmode))
4203         {
4204                 uint32          fasthashcode = FastPathStrongLockHashPartition(hashcode);
4205
4206                 SpinLockAcquire(&FastPathStrongRelationLocks->mutex);
4207                 FastPathStrongRelationLocks->count[fasthashcode]++;
4208                 SpinLockRelease(&FastPathStrongRelationLocks->mutex);
4209         }
4210
4211         LWLockRelease(partitionLock);
4212 }
4213
4214 /*
4215  * Re-acquire a lock belonging to a transaction that was prepared, when
4216  * starting up into hot standby mode.
4217  */
4218 void
4219 lock_twophase_standby_recover(TransactionId xid, uint16 info,
4220                                                           void *recdata, uint32 len)
4221 {
4222         TwoPhaseLockRecord *rec = (TwoPhaseLockRecord *) recdata;
4223         LOCKTAG    *locktag;
4224         LOCKMODE        lockmode;
4225         LOCKMETHODID lockmethodid;
4226
4227         Assert(len == sizeof(TwoPhaseLockRecord));
4228         locktag = &rec->locktag;
4229         lockmode = rec->lockmode;
4230         lockmethodid = locktag->locktag_lockmethodid;
4231
4232         if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
4233                 elog(ERROR, "unrecognized lock method: %d", lockmethodid);
4234
4235         if (lockmode == AccessExclusiveLock &&
4236                 locktag->locktag_type == LOCKTAG_RELATION)
4237         {
4238                 StandbyAcquireAccessExclusiveLock(xid,
4239                                                                                   locktag->locktag_field1 /* dboid */ ,
4240                                                                                   locktag->locktag_field2 /* reloid */ );
4241         }
4242 }
4243
4244
4245 /*
4246  * 2PC processing routine for COMMIT PREPARED case.
4247  *
4248  * Find and release the lock indicated by the 2PC record.
4249  */
4250 void
4251 lock_twophase_postcommit(TransactionId xid, uint16 info,
4252                                                  void *recdata, uint32 len)
4253 {
4254         TwoPhaseLockRecord *rec = (TwoPhaseLockRecord *) recdata;
4255         PGPROC     *proc = TwoPhaseGetDummyProc(xid, true);
4256         LOCKTAG    *locktag;
4257         LOCKMETHODID lockmethodid;
4258         LockMethod      lockMethodTable;
4259
4260         Assert(len == sizeof(TwoPhaseLockRecord));
4261         locktag = &rec->locktag;
4262         lockmethodid = locktag->locktag_lockmethodid;
4263
4264         if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
4265                 elog(ERROR, "unrecognized lock method: %d", lockmethodid);
4266         lockMethodTable = LockMethods[lockmethodid];
4267
4268         LockRefindAndRelease(lockMethodTable, proc, locktag, rec->lockmode, true);
4269 }
4270
4271 /*
4272  * 2PC processing routine for ROLLBACK PREPARED case.
4273  *
4274  * This is actually just the same as the COMMIT case.
4275  */
4276 void
4277 lock_twophase_postabort(TransactionId xid, uint16 info,
4278                                                 void *recdata, uint32 len)
4279 {
4280         lock_twophase_postcommit(xid, info, recdata, len);
4281 }
4282
4283 /*
4284  *              VirtualXactLockTableInsert
4285  *
4286  *              Take vxid lock via the fast-path.  There can't be any pre-existing
4287  *              lockers, as we haven't advertised this vxid via the ProcArray yet.
4288  *
4289  *              Since MyProc->fpLocalTransactionId will normally contain the same data
4290  *              as MyProc->lxid, you might wonder if we really need both.  The
4291  *              difference is that MyProc->lxid is set and cleared unlocked, and
4292  *              examined by procarray.c, while fpLocalTransactionId is protected by
4293  *              backendLock and is used only by the locking subsystem.  Doing it this
4294  *              way makes it easier to verify that there are no funny race conditions.
4295  *
4296  *              We don't bother recording this lock in the local lock table, since it's
4297  *              only ever released at the end of a transaction.  Instead,
4298  *              LockReleaseAll() calls VirtualXactLockTableCleanup().
4299  */
4300 void
4301 VirtualXactLockTableInsert(VirtualTransactionId vxid)
4302 {
4303         Assert(VirtualTransactionIdIsValid(vxid));
4304
4305         LWLockAcquire(&MyProc->backendLock, LW_EXCLUSIVE);
4306
4307         Assert(MyProc->backendId == vxid.backendId);
4308         Assert(MyProc->fpLocalTransactionId == InvalidLocalTransactionId);
4309         Assert(MyProc->fpVXIDLock == false);
4310
4311         MyProc->fpVXIDLock = true;
4312         MyProc->fpLocalTransactionId = vxid.localTransactionId;
4313
4314         LWLockRelease(&MyProc->backendLock);
4315 }
4316
4317 /*
4318  *              VirtualXactLockTableCleanup
4319  *
4320  *              Check whether a VXID lock has been materialized; if so, release it,
4321  *              unblocking waiters.
4322  */
4323 void
4324 VirtualXactLockTableCleanup(void)
4325 {
4326         bool            fastpath;
4327         LocalTransactionId lxid;
4328
4329         Assert(MyProc->backendId != InvalidBackendId);
4330
4331         /*
4332          * Clean up shared memory state.
4333          */
4334         LWLockAcquire(&MyProc->backendLock, LW_EXCLUSIVE);
4335
4336         fastpath = MyProc->fpVXIDLock;
4337         lxid = MyProc->fpLocalTransactionId;
4338         MyProc->fpVXIDLock = false;
4339         MyProc->fpLocalTransactionId = InvalidLocalTransactionId;
4340
4341         LWLockRelease(&MyProc->backendLock);
4342
4343         /*
4344          * If fpVXIDLock has been cleared without touching fpLocalTransactionId,
4345          * that means someone transferred the lock to the main lock table.
4346          */
4347         if (!fastpath && LocalTransactionIdIsValid(lxid))
4348         {
4349                 VirtualTransactionId vxid;
4350                 LOCKTAG         locktag;
4351
4352                 vxid.backendId = MyBackendId;
4353                 vxid.localTransactionId = lxid;
4354                 SET_LOCKTAG_VIRTUALTRANSACTION(locktag, vxid);
4355
4356                 LockRefindAndRelease(LockMethods[DEFAULT_LOCKMETHOD], MyProc,
4357                                                          &locktag, ExclusiveLock, false);
4358         }
4359 }
4360
4361 /*
4362  *              VirtualXactLock
4363  *
4364  * If wait = true, wait until the given VXID has been released, and then
4365  * return true.
4366  *
4367  * If wait = false, just check whether the VXID is still running, and return
4368  * true or false.
4369  */
4370 bool
4371 VirtualXactLock(VirtualTransactionId vxid, bool wait)
4372 {
4373         LOCKTAG         tag;
4374         PGPROC     *proc;
4375
4376         Assert(VirtualTransactionIdIsValid(vxid));
4377
4378         SET_LOCKTAG_VIRTUALTRANSACTION(tag, vxid);
4379
4380         /*
4381          * If a lock table entry must be made, this is the PGPROC on whose behalf
4382          * it must be done.  Note that the transaction might end or the PGPROC
4383          * might be reassigned to a new backend before we get around to examining
4384          * it, but it doesn't matter.  If we find upon examination that the
4385          * relevant lxid is no longer running here, that's enough to prove that
4386          * it's no longer running anywhere.
4387          */
4388         proc = BackendIdGetProc(vxid.backendId);
4389         if (proc == NULL)
4390                 return true;
4391
4392         /*
4393          * We must acquire this lock before checking the backendId and lxid
4394          * against the ones we're waiting for.  The target backend will only set
4395          * or clear lxid while holding this lock.
4396          */
4397         LWLockAcquire(&proc->backendLock, LW_EXCLUSIVE);
4398
4399         /* If the transaction has ended, our work here is done. */
4400         if (proc->backendId != vxid.backendId
4401                 || proc->fpLocalTransactionId != vxid.localTransactionId)
4402         {
4403                 LWLockRelease(&proc->backendLock);
4404                 return true;
4405         }
4406
4407         /*
4408          * If we aren't asked to wait, there's no need to set up a lock table
4409          * entry.  The transaction is still in progress, so just return false.
4410          */
4411         if (!wait)
4412         {
4413                 LWLockRelease(&proc->backendLock);
4414                 return false;
4415         }
4416
4417         /*
4418          * OK, we're going to need to sleep on the VXID.  But first, we must set
4419          * up the primary lock table entry, if needed (ie, convert the proc's
4420          * fast-path lock on its VXID to a regular lock).
4421          */
4422         if (proc->fpVXIDLock)
4423         {
4424                 PROCLOCK   *proclock;
4425                 uint32          hashcode;
4426                 LWLock     *partitionLock;
4427
4428                 hashcode = LockTagHashCode(&tag);
4429
4430                 partitionLock = LockHashPartitionLock(hashcode);
4431                 LWLockAcquire(partitionLock, LW_EXCLUSIVE);
4432
4433                 proclock = SetupLockInTable(LockMethods[DEFAULT_LOCKMETHOD], proc,
4434                                                                         &tag, hashcode, ExclusiveLock);
4435                 if (!proclock)
4436                 {
4437                         LWLockRelease(partitionLock);
4438                         LWLockRelease(&proc->backendLock);
4439                         ereport(ERROR,
4440                                         (errcode(ERRCODE_OUT_OF_MEMORY),
4441                                          errmsg("out of shared memory"),
4442                                          errhint("You might need to increase max_locks_per_transaction.")));
4443                 }
4444                 GrantLock(proclock->tag.myLock, proclock, ExclusiveLock);
4445
4446                 LWLockRelease(partitionLock);
4447
4448                 proc->fpVXIDLock = false;
4449         }
4450
4451         /* Done with proc->fpLockBits */
4452         LWLockRelease(&proc->backendLock);
4453
4454         /* Time to wait. */
4455         (void) LockAcquire(&tag, ShareLock, false, false);
4456
4457         LockRelease(&tag, ShareLock, false);
4458         return true;
4459 }
4460
4461 /*
4462  * LockWaiterCount
4463  *
4464  * Find the number of lock requester on this locktag
4465  */
4466 int
4467 LockWaiterCount(const LOCKTAG *locktag)
4468 {
4469         LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
4470         LOCK       *lock;
4471         bool            found;
4472         uint32          hashcode;
4473         LWLock     *partitionLock;
4474         int                     waiters = 0;
4475
4476         if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
4477                 elog(ERROR, "unrecognized lock method: %d", lockmethodid);
4478
4479         hashcode = LockTagHashCode(locktag);
4480         partitionLock = LockHashPartitionLock(hashcode);
4481         LWLockAcquire(partitionLock, LW_EXCLUSIVE);
4482
4483         lock = (LOCK *) hash_search_with_hash_value(LockMethodLockHash,
4484                                                                                                 (const void *) locktag,
4485                                                                                                 hashcode,
4486                                                                                                 HASH_FIND,
4487                                                                                                 &found);
4488         if (found)
4489         {
4490                 Assert(lock != NULL);
4491                 waiters = lock->nRequested;
4492         }
4493         LWLockRelease(partitionLock);
4494
4495         return waiters;
4496 }