]> granicus.if.org Git - postgresql/blob - src/backend/storage/lmgr/lock.c
Clean up lockmanager data structures some more, in preparation for planned
[postgresql] / src / backend / storage / lmgr / lock.c
1 /*-------------------------------------------------------------------------
2  *
3  * lock.c
4  *        POSTGRES low-level lock mechanism
5  *
6  * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.79 2001/01/22 22:30:06 tgl Exp $
12  *
13  * NOTES
14  *        Outside modules can create a lock table and acquire/release
15  *        locks.  A lock table is a shared memory hash table.  When
16  *        a process tries to acquire a lock of a type that conflicts
17  *        with existing locks, it is put to sleep using the routines
18  *        in storage/lmgr/proc.c.
19  *
20  *        For the most part, this code should be invoked via lmgr.c
21  *        or another lock-management module, not directly.
22  *
23  *      Interface:
24  *
25  *      LockAcquire(), LockRelease(), LockMethodTableInit(),
26  *      LockMethodTableRename(), LockReleaseAll,
27  *      LockResolveConflicts(), GrantLock()
28  *
29  *-------------------------------------------------------------------------
30  */
31 #include <sys/types.h>
32 #include <unistd.h>
33 #include <signal.h>
34
35 #include "postgres.h"
36
37 #include "access/xact.h"
38 #include "miscadmin.h"
39 #include "storage/proc.h"
40 #include "utils/memutils.h"
41 #include "utils/ps_status.h"
42
43 static int      WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode,
44                                            LOCK *lock, HOLDER *holder);
45 static void LockCountMyLocks(SHMEM_OFFSET lockOffset, PROC *proc,
46                                                          int *myHolding);
47 static int LockGetMyHeldLocks(SHMEM_OFFSET lockOffset, PROC *proc);
48
49 static char *lock_types[] =
50 {
51         "INVALID",
52         "AccessShareLock",
53         "RowShareLock",
54         "RowExclusiveLock",
55         "ShareLock",
56         "ShareRowExclusiveLock",
57         "ExclusiveLock",
58         "AccessExclusiveLock"
59 };
60
61 static char *DeadLockMessage = "Deadlock detected.\n\tSee the lock(l) manual page for a possible cause.";
62
63
64 #ifdef LOCK_DEBUG
65
66 /*------
67  * The following configuration options are available for lock debugging:
68  *
69  *     trace_locks      -- give a bunch of output what's going on in this file
70  *     trace_userlocks  -- same but for user locks
71  *     trace_lock_oidmin-- do not trace locks for tables below this oid
72  *                         (use to avoid output on system tables)
73  *     trace_lock_table -- trace locks on this table (oid) unconditionally
74  *     debug_deadlocks  -- currently dumps locks at untimely occasions ;)
75  * Furthermore, but in storage/ipc/spin.c:
76  *     trace_spinlocks  -- trace spinlocks (pretty useless)
77  *
78  * Define LOCK_DEBUG at compile time to get all this enabled.
79  */
80
81 int  Trace_lock_oidmin  = BootstrapObjectIdData;
82 bool Trace_locks        = false;
83 bool Trace_userlocks    = false;
84 int  Trace_lock_table   = 0;
85 bool Debug_deadlocks    = false;
86
87
88 inline static bool
89 LOCK_DEBUG_ENABLED(const LOCK * lock)
90 {
91     return
92         (((LOCK_LOCKMETHOD(*lock) == DEFAULT_LOCKMETHOD && Trace_locks)
93           || (LOCK_LOCKMETHOD(*lock) == USER_LOCKMETHOD && Trace_userlocks))
94          && (lock->tag.relId >= (Oid) Trace_lock_oidmin))
95         || (Trace_lock_table && (lock->tag.relId == Trace_lock_table));
96 }
97
98
99 inline static void
100 LOCK_PRINT(const char * where, const LOCK * lock, LOCKMODE type)
101 {
102         if (LOCK_DEBUG_ENABLED(lock))
103         elog(DEBUG,
104              "%s: lock(%lx) tbl(%d) rel(%u) db(%u) obj(%u) grantMask(%x) "
105              "req(%d,%d,%d,%d,%d,%d,%d)=%d "
106              "grant(%d,%d,%d,%d,%d,%d,%d)=%d wait(%d) type(%s)",
107              where, MAKE_OFFSET(lock),
108              lock->tag.lockmethod, lock->tag.relId, lock->tag.dbId,
109              lock->tag.objId.blkno, lock->grantMask,
110              lock->requested[1], lock->requested[2], lock->requested[3],
111                          lock->requested[4], lock->requested[5], lock->requested[6],
112                          lock->requested[7], lock->nRequested,
113              lock->granted[1], lock->granted[2], lock->granted[3],
114              lock->granted[4], lock->granted[5], lock->granted[6],
115              lock->granted[7], lock->nGranted,
116              lock->waitProcs.size, lock_types[type]);
117 }
118
119
120 inline static void
121 HOLDER_PRINT(const char * where, const HOLDER * holderP)
122 {
123         if (
124         (((HOLDER_LOCKMETHOD(*holderP) == DEFAULT_LOCKMETHOD && Trace_locks)
125           || (HOLDER_LOCKMETHOD(*holderP) == USER_LOCKMETHOD && Trace_userlocks))
126          && (((LOCK *)MAKE_PTR(holderP->tag.lock))->tag.relId >= (Oid) Trace_lock_oidmin))
127                 || (Trace_lock_table && (((LOCK *)MAKE_PTR(holderP->tag.lock))->tag.relId == Trace_lock_table))
128         )
129         elog(DEBUG,
130              "%s: holder(%lx) lock(%lx) tbl(%d) proc(%lx) xid(%u) hold(%d,%d,%d,%d,%d,%d,%d)=%d",
131              where, MAKE_OFFSET(holderP), holderP->tag.lock,
132                          HOLDER_LOCKMETHOD(*(holderP)),
133              holderP->tag.proc, holderP->tag.xid,
134              holderP->holding[1], holderP->holding[2], holderP->holding[3],
135                          holderP->holding[4], holderP->holding[5], holderP->holding[6],
136                          holderP->holding[7], holderP->nHolding);
137 }
138
139 #else  /* not LOCK_DEBUG */
140
141 #define LOCK_PRINT(where, lock, type)
142 #define HOLDER_PRINT(where, holderP)
143
144 #endif /* not LOCK_DEBUG */
145
146
147
148 SPINLOCK        LockMgrLock;            /* in Shmem or created in
149                                                                  * CreateSpinlocks() */
150
151 /*
152  * These are to simplify/speed up some bit arithmetic.
153  *
154  * XXX is a fetch from a static array really faster than a shift?
155  * Wouldn't bet on it...
156  */
157
158 static LOCKMASK BITS_OFF[MAX_LOCKMODES];
159 static LOCKMASK BITS_ON[MAX_LOCKMODES];
160
161 /* -----------------
162  * Disable flag
163  * -----------------
164  */
165 static bool LockingIsDisabled;
166
167 /* -------------------
168  * map from lockmethod to the lock table structure
169  * -------------------
170  */
171 static LOCKMETHODTABLE *LockMethodTable[MAX_LOCK_METHODS];
172
173 static int      NumLockMethods;
174
175 /* -------------------
176  * InitLocks -- Init the lock module.  Create a private data
177  *              structure for constructing conflict masks.
178  * -------------------
179  */
180 void
181 InitLocks(void)
182 {
183         int                     i;
184         int                     bit;
185
186         bit = 1;
187         for (i = 0; i < MAX_LOCKMODES; i++, bit <<= 1)
188         {
189                 BITS_ON[i] = bit;
190                 BITS_OFF[i] = ~bit;
191         }
192 }
193
194 /* -------------------
195  * LockDisable -- sets LockingIsDisabled flag to TRUE or FALSE.
196  * ------------------
197  */
198 void
199 LockDisable(bool status)
200 {
201         LockingIsDisabled = status;
202 }
203
204 /* -----------------
205  * Boolean function to determine current locking status
206  * -----------------
207  */
208 bool
209 LockingDisabled(void)
210 {
211         return LockingIsDisabled;
212 }
213
214
215 /*
216  * LockMethodInit -- initialize the lock table's lock type
217  *              structures
218  *
219  * Notes: just copying.  Should only be called once.
220  */
221 static void
222 LockMethodInit(LOCKMETHODTABLE *lockMethodTable,
223                            LOCKMASK *conflictsP,
224                            int *prioP,
225                            int numModes)
226 {
227         int                     i;
228
229         lockMethodTable->ctl->numLockModes = numModes;
230         numModes++;
231         for (i = 0; i < numModes; i++, prioP++, conflictsP++)
232         {
233                 lockMethodTable->ctl->conflictTab[i] = *conflictsP;
234                 lockMethodTable->ctl->prio[i] = *prioP;
235         }
236 }
237
238 /*
239  * LockMethodTableInit -- initialize a lock table structure
240  *
241  * Notes:
242  *              (a) a lock table has four separate entries in the shmem index
243  *              table.  This is because every shared hash table and spinlock
244  *              has its name stored in the shmem index at its creation.  It
245  *              is wasteful, in this case, but not much space is involved.
246  *
247  * NOTE: data structures allocated here are allocated permanently, using
248  * TopMemoryContext and shared memory.  We don't ever release them anyway,
249  * and in normal multi-backend operation the lock table structures set up
250  * by the postmaster are inherited by each backend, so they must be in
251  * TopMemoryContext.
252  */
253 LOCKMETHOD
254 LockMethodTableInit(char *tabName,
255                                         LOCKMASK *conflictsP,
256                                         int *prioP,
257                                         int numModes,
258                                         int maxBackends)
259 {
260         LOCKMETHODTABLE *lockMethodTable;
261         char       *shmemName;
262         HASHCTL         info;
263         int                     hash_flags;
264         bool            found;
265         long            init_table_size,
266                                 max_table_size;
267
268         if (numModes > MAX_LOCKMODES)
269         {
270                 elog(NOTICE, "LockMethodTableInit: too many lock types %d greater than %d",
271                          numModes, MAX_LOCKMODES);
272                 return INVALID_LOCKMETHOD;
273         }
274
275         /* Compute init/max size to request for lock hashtables */
276         max_table_size = NLOCKENTS(maxBackends);
277         init_table_size = max_table_size / 10;
278
279         /* Allocate a string for the shmem index table lookups. */
280         /* This is just temp space in this routine, so palloc is OK. */
281         shmemName = (char *) palloc(strlen(tabName) + 32);
282
283         /* each lock table has a non-shared, permanent header */
284         lockMethodTable = (LOCKMETHODTABLE *)
285                 MemoryContextAlloc(TopMemoryContext, sizeof(LOCKMETHODTABLE));
286
287         /* ------------------------
288          * find/acquire the spinlock for the table
289          * ------------------------
290          */
291         SpinAcquire(LockMgrLock);
292
293         /* -----------------------
294          * allocate a control structure from shared memory or attach to it
295          * if it already exists.
296          * -----------------------
297          */
298         sprintf(shmemName, "%s (ctl)", tabName);
299         lockMethodTable->ctl = (LOCKMETHODCTL *)
300                 ShmemInitStruct(shmemName, sizeof(LOCKMETHODCTL), &found);
301
302         if (!lockMethodTable->ctl)
303                 elog(FATAL, "LockMethodTableInit: couldn't initialize %s", tabName);
304
305         /* -------------------
306          * no zero-th table
307          * -------------------
308          */
309         NumLockMethods = 1;
310
311         /* ----------------
312          * we're first - initialize
313          * ----------------
314          */
315         if (!found)
316         {
317                 MemSet(lockMethodTable->ctl, 0, sizeof(LOCKMETHODCTL));
318                 lockMethodTable->ctl->masterLock = LockMgrLock;
319                 lockMethodTable->ctl->lockmethod = NumLockMethods;
320         }
321
322         /* --------------------
323          * other modules refer to the lock table by a lockmethod ID
324          * --------------------
325          */
326         LockMethodTable[NumLockMethods] = lockMethodTable;
327         NumLockMethods++;
328         Assert(NumLockMethods <= MAX_LOCK_METHODS);
329
330         /* ----------------------
331          * allocate a hash table for LOCK structs.  This is used
332          * to store per-locked-object information.
333          * ----------------------
334          */
335         info.keysize = SHMEM_LOCKTAB_KEYSIZE;
336         info.datasize = SHMEM_LOCKTAB_DATASIZE;
337         info.hash = tag_hash;
338         hash_flags = (HASH_ELEM | HASH_FUNCTION);
339
340         sprintf(shmemName, "%s (lock hash)", tabName);
341         lockMethodTable->lockHash = ShmemInitHash(shmemName,
342                                                                                           init_table_size,
343                                                                                           max_table_size,
344                                                                                           &info,
345                                                                                           hash_flags);
346
347         if (!lockMethodTable->lockHash)
348                 elog(FATAL, "LockMethodTableInit: couldn't initialize %s", tabName);
349         Assert(lockMethodTable->lockHash->hash == tag_hash);
350
351         /* -------------------------
352          * allocate a hash table for HOLDER structs.  This is used
353          * to store per-lock-holder information.
354          * -------------------------
355          */
356         info.keysize = SHMEM_HOLDERTAB_KEYSIZE;
357         info.datasize = SHMEM_HOLDERTAB_DATASIZE;
358         info.hash = tag_hash;
359         hash_flags = (HASH_ELEM | HASH_FUNCTION);
360
361         sprintf(shmemName, "%s (holder hash)", tabName);
362         lockMethodTable->holderHash = ShmemInitHash(shmemName,
363                                                                                                 init_table_size,
364                                                                                                 max_table_size,
365                                                                                                 &info,
366                                                                                                 hash_flags);
367
368         if (!lockMethodTable->holderHash)
369                 elog(FATAL, "LockMethodTableInit: couldn't initialize %s", tabName);
370
371         /* init ctl data structures */
372         LockMethodInit(lockMethodTable, conflictsP, prioP, numModes);
373
374         SpinRelease(LockMgrLock);
375
376         pfree(shmemName);
377
378         return lockMethodTable->ctl->lockmethod;
379 }
380
381 /*
382  * LockMethodTableRename -- allocate another lockmethod ID to the same
383  *              lock table.
384  *
385  * NOTES: Both the lock module and the lock chain (lchain.c)
386  *              module use table id's to distinguish between different
387  *              kinds of locks.  Short term and long term locks look
388  *              the same to the lock table, but are handled differently
389  *              by the lock chain manager.      This function allows the
390  *              client to use different lockmethods when acquiring/releasing
391  *              short term and long term locks, yet store them all in one hashtable.
392  */
393
394 LOCKMETHOD
395 LockMethodTableRename(LOCKMETHOD lockmethod)
396 {
397         LOCKMETHOD      newLockMethod;
398
399         if (NumLockMethods >= MAX_LOCK_METHODS)
400                 return INVALID_LOCKMETHOD;
401         if (LockMethodTable[lockmethod] == INVALID_LOCKMETHOD)
402                 return INVALID_LOCKMETHOD;
403
404         /* other modules refer to the lock table by a lockmethod ID */
405         newLockMethod = NumLockMethods;
406         NumLockMethods++;
407
408         LockMethodTable[newLockMethod] = LockMethodTable[lockmethod];
409         return newLockMethod;
410 }
411
412 /*
413  * LockAcquire -- Check for lock conflicts, sleep if conflict found,
414  *              set lock if/when no conflicts.
415  *
416  * Returns: TRUE if parameters are correct, FALSE otherwise.
417  *
418  * Side Effects: The lock is always acquired.  No way to abort
419  *              a lock acquisition other than aborting the transaction.
420  *              Lock is recorded in the lkchain.
421  *
422  *
423  * Note on User Locks:
424  *
425  *              User locks are handled totally on the application side as
426  *              long term cooperative locks which extend beyond the normal
427  *              transaction boundaries.  Their purpose is to indicate to an
428  *              application that someone is `working' on an item.  So it is
429  *              possible to put an user lock on a tuple's oid, retrieve the
430  *              tuple, work on it for an hour and then update it and remove
431  *              the lock.  While the lock is active other clients can still
432  *              read and write the tuple but they can be aware that it has
433  *              been locked at the application level by someone.
434  *              User locks use lock tags made of an uint16 and an uint32, for
435  *              example 0 and a tuple oid, or any other arbitrary pair of
436  *              numbers following a convention established by the application.
437  *              In this sense tags don't refer to tuples or database entities.
438  *              User locks and normal locks are completely orthogonal and
439  *              they don't interfere with each other, so it is possible
440  *              to acquire a normal lock on an user-locked tuple or user-lock
441  *              a tuple for which a normal write lock already exists.
442  *              User locks are always non blocking, therefore they are never
443  *              acquired if already held by another process.  They must be
444  *              released explicitly by the application but they are released
445  *              automatically when a backend terminates.
446  *              They are indicated by a lockmethod 2 which is an alias for the
447  *              normal lock table, and are distinguished from normal locks
448  *              by the following differences:
449  *
450  *                                                                              normal lock             user lock
451  *
452  *              lockmethod                                              1                               2
453  *              tag.dbId                                                database oid    database oid
454  *              tag.relId                                               rel oid or 0    0
455  *              tag.objId                                               block id                lock id2
456  *                                                                              or xact id
457  *              tag.offnum                                              0                               lock id1
458  *              holder.xid                                              xid or 0                0
459  *              persistence                                             transaction             user or backend
460  *                                                                              or backend
461  *
462  *              The lockmode parameter can have the same values for normal locks
463  *              although probably only WRITE_LOCK can have some practical use.
464  *
465  *                                                                                                              DZ - 22 Nov 1997
466  */
467
468 bool
469 LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
470                         TransactionId xid, LOCKMODE lockmode)
471 {
472         HOLDER     *holder;
473         HOLDERTAG       holdertag;
474         HTAB       *holderTable;
475         bool            found;
476         LOCK       *lock;
477         SPINLOCK        masterLock;
478         LOCKMETHODTABLE *lockMethodTable;
479         int                     status;
480         int                     myHolding[MAX_LOCKMODES];
481         int                     i;
482
483 #ifdef LOCK_DEBUG
484         if (lockmethod == USER_LOCKMETHOD && Trace_userlocks)
485                 elog(DEBUG, "LockAcquire: user lock [%u] %s",
486                          locktag->objId.blkno, lock_types[lockmode]);
487 #endif
488
489         /* ???????? This must be changed when short term locks will be used */
490         locktag->lockmethod = lockmethod;
491
492         Assert(lockmethod < NumLockMethods);
493         lockMethodTable = LockMethodTable[lockmethod];
494         if (!lockMethodTable)
495         {
496                 elog(NOTICE, "LockAcquire: bad lock table %d", lockmethod);
497                 return FALSE;
498         }
499
500         if (LockingIsDisabled)
501                 return TRUE;
502
503         masterLock = lockMethodTable->ctl->masterLock;
504
505         SpinAcquire(masterLock);
506
507         /*
508          * Find or create a lock with this tag
509          */
510         Assert(lockMethodTable->lockHash->hash == tag_hash);
511         lock = (LOCK *) hash_search(lockMethodTable->lockHash, (Pointer) locktag,
512                                                                 HASH_ENTER, &found);
513         if (!lock)
514         {
515                 SpinRelease(masterLock);
516                 elog(FATAL, "LockAcquire: lock table %d is corrupted", lockmethod);
517                 return FALSE;
518         }
519
520         /* --------------------
521          * if it's a new lock object, initialize it
522          * --------------------
523          */
524         if (!found)
525         {
526                 lock->grantMask = 0;
527                 lock->waitMask = 0;
528                 SHMQueueInit(&(lock->lockHolders));
529                 ProcQueueInit(&(lock->waitProcs));
530                 lock->nRequested = 0;
531                 lock->nGranted = 0;
532                 MemSet((char *) lock->requested, 0, sizeof(int) * MAX_LOCKMODES);
533                 MemSet((char *) lock->granted, 0, sizeof(int) * MAX_LOCKMODES);
534                 LOCK_PRINT("LockAcquire: new", lock, lockmode);
535         }
536         else
537         {
538                 LOCK_PRINT("LockAcquire: found", lock, lockmode);
539                 Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
540                 Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
541                 Assert(lock->nGranted <= lock->nRequested);
542         }
543
544         /* ------------------
545          * Create the hash key for the holder table.
546          * ------------------
547          */
548         MemSet(&holdertag, 0, sizeof(HOLDERTAG)); /* must clear padding, needed */
549         holdertag.lock = MAKE_OFFSET(lock);
550         holdertag.proc = MAKE_OFFSET(MyProc);
551         TransactionIdStore(xid, &holdertag.xid);
552
553         /*
554          * Find or create a holder entry with this tag
555          */
556         holderTable = lockMethodTable->holderHash;
557         holder = (HOLDER *) hash_search(holderTable, (Pointer) &holdertag,
558                                                                         HASH_ENTER, &found);
559         if (!holder)
560         {
561                 SpinRelease(masterLock);
562                 elog(NOTICE, "LockAcquire: holder table corrupted");
563                 return FALSE;
564         }
565
566         /*
567          * If new, initialize the new entry
568          */
569         if (!found)
570         {
571                 holder->nHolding = 0;
572                 MemSet((char *) holder->holding, 0, sizeof(int) * MAX_LOCKMODES);
573                 /* Add holder to appropriate lists */
574                 SHMQueueInsertBefore(&lock->lockHolders, &holder->lockLink);
575                 SHMQueueInsertBefore(&MyProc->procHolders, &holder->procLink);
576                 HOLDER_PRINT("LockAcquire: new", holder);
577         }
578         else
579         {
580                 HOLDER_PRINT("LockAcquire: found", holder);
581                 Assert((holder->nHolding >= 0) && (holder->holding[lockmode] >= 0));
582                 Assert(holder->nHolding <= lock->nGranted);
583
584 #ifdef CHECK_DEADLOCK_RISK
585                 /*
586                  * Issue warning if we already hold a lower-level lock on this
587                  * object and do not hold a lock of the requested level or higher.
588                  * This indicates a deadlock-prone coding practice (eg, we'd have
589                  * a deadlock if another backend were following the same code path
590                  * at about the same time).
591                  *
592                  * This is not enabled by default, because it may generate log entries
593                  * about user-level coding practices that are in fact safe in context.
594                  * It can be enabled to help find system-level problems.
595                  *
596                  * XXX Doing numeric comparison on the lockmodes is a hack;
597                  * it'd be better to use a table.  For now, though, this works.
598                  */
599                 for (i = lockMethodTable->ctl->numLockModes; i > 0; i--)
600                 {
601                         if (holder->holding[i] > 0)
602                         {
603                                 if (i >= (int) lockmode)
604                                         break;          /* safe: we have a lock >= req level */
605                                 elog(DEBUG, "Deadlock risk: raising lock level"
606                                          " from %s to %s on object %u/%u/%u",
607                                          lock_types[i], lock_types[lockmode],
608                                          lock->tag.relId, lock->tag.dbId, lock->tag.objId.blkno);
609                                 break;
610                         }
611                 }
612 #endif /* CHECK_DEADLOCK_RISK */
613         }
614
615         /* ----------------
616          * lock->nRequested and lock->requested[] count the total number of
617          * requests, whether granted or waiting, so increment those immediately.
618          * The other counts don't increment till we get the lock.
619          * ----------------
620          */
621         lock->nRequested++;
622         lock->requested[lockmode]++;
623         Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
624
625         /* --------------------
626          * If I'm the only one holding any lock on this object, then there
627          * cannot be a conflict. The same is true if I already hold this lock.
628          * --------------------
629          */
630         if (holder->nHolding == lock->nGranted || holder->holding[lockmode] != 0)
631         {
632                 GrantLock(lock, holder, lockmode);
633                 HOLDER_PRINT("LockAcquire: owning", holder);
634                 SpinRelease(masterLock);
635                 return TRUE;
636         }
637
638         /* --------------------
639          * If this process (under any XID) is a holder of the lock,
640          * then there is no conflict, either.
641          * --------------------
642          */
643         LockCountMyLocks(holder->tag.lock, MyProc, myHolding);
644         if (myHolding[lockmode] != 0)
645         {
646                 GrantLock(lock, holder, lockmode);
647                 HOLDER_PRINT("LockAcquire: my other XID owning", holder);
648                 SpinRelease(masterLock);
649                 return TRUE;
650         }
651
652         /*
653          * If lock requested conflicts with locks requested by waiters...
654          */
655         if (lockMethodTable->ctl->conflictTab[lockmode] & lock->waitMask)
656         {
657                 /*
658                  * If my process doesn't hold any locks that conflict with waiters
659                  * then force to sleep, so that prior waiters get first chance.
660                  */
661                 for (i = 1; i <= lockMethodTable->ctl->numLockModes; i++)
662                 {
663                         if (myHolding[i] > 0 &&
664                                 lockMethodTable->ctl->conflictTab[i] & lock->waitMask)
665                                 break;                  /* yes, there is a conflict */
666                 }
667
668                 if (i > lockMethodTable->ctl->numLockModes)
669                 {
670                         HOLDER_PRINT("LockAcquire: another proc already waiting",
671                                                  holder);
672                         status = STATUS_FOUND;
673                 }
674                 else
675                         status = LockResolveConflicts(lockmethod, lockmode,
676                                                                                   lock, holder,
677                                                                                   MyProc, myHolding);
678         }
679         else
680                 status = LockResolveConflicts(lockmethod, lockmode,
681                                                                           lock, holder,
682                                                                           MyProc, myHolding);
683
684         if (status == STATUS_OK)
685                 GrantLock(lock, holder, lockmode);
686         else if (status == STATUS_FOUND)
687         {
688 #ifdef USER_LOCKS
689
690                 /*
691                  * User locks are non blocking. If we can't acquire a lock we must
692                  * remove the holder entry and return FALSE without waiting.
693                  */
694                 if (lockmethod == USER_LOCKMETHOD)
695                 {
696                         if (holder->nHolding == 0)
697                         {
698                                 SHMQueueDelete(&holder->lockLink);
699                                 SHMQueueDelete(&holder->procLink);
700                                 holder = (HOLDER *) hash_search(holderTable,
701                                                                                                 (Pointer) holder,
702                                                                                                 HASH_REMOVE, &found);
703                                 if (!holder || !found)
704                                         elog(NOTICE, "LockAcquire: remove holder, table corrupted");
705                         }
706                         else
707                                 HOLDER_PRINT("LockAcquire: NHOLDING", holder);
708                         lock->nRequested--;
709                         lock->requested[lockmode]--;
710                         LOCK_PRINT("LockAcquire: user lock failed", lock, lockmode);
711                         Assert((lock->nRequested > 0) && (lock->requested[lockmode] >= 0));
712                         Assert(lock->nGranted <= lock->nRequested);
713                         SpinRelease(masterLock);
714                         return FALSE;
715                 }
716 #endif /* USER_LOCKS */
717
718                 /*
719                  * Construct bitmask of locks this process holds on this object.
720                  */
721                 {
722                         int                     heldLocks = 0;
723                         int                     tmpMask;
724
725                         for (i = 1, tmpMask = 2;
726                                  i <= lockMethodTable->ctl->numLockModes;
727                                  i++, tmpMask <<= 1)
728                         {
729                                 if (myHolding[i] > 0)
730                                         heldLocks |= tmpMask;
731                         }
732                         MyProc->heldLocks = heldLocks;
733                 }
734
735                 /*
736                  * Sleep till someone wakes me up.
737                  */
738                 status = WaitOnLock(lockmethod, lockmode, lock, holder);
739
740                 /*
741                  * NOTE: do not do any material change of state between here and
742                  * return.  All required changes in locktable state must have been
743                  * done when the lock was granted to us --- see notes in WaitOnLock.
744                  */
745
746                 /*
747                  * Check the holder entry status, in case something in the ipc
748                  * communication doesn't work correctly.
749                  */
750                 if (!((holder->nHolding > 0) && (holder->holding[lockmode] > 0)))
751                 {
752                         HOLDER_PRINT("LockAcquire: INCONSISTENT", holder);
753                         LOCK_PRINT("LockAcquire: INCONSISTENT", lock, lockmode);
754                         /* Should we retry ? */
755                         SpinRelease(masterLock);
756                         return FALSE;
757                 }
758                 HOLDER_PRINT("LockAcquire: granted", holder);
759                 LOCK_PRINT("LockAcquire: granted", lock, lockmode);
760         }
761
762         SpinRelease(masterLock);
763
764         return status == STATUS_OK;
765 }
766
767 /* ----------------------------
768  * LockResolveConflicts -- test for lock conflicts
769  *
770  * NOTES:
771  *              Here's what makes this complicated: one transaction's
772  * locks don't conflict with one another.  When many processes
773  * hold locks, each has to subtract off the other's locks when
774  * determining whether or not any new lock acquired conflicts with
775  * the old ones.
776  *
777  * The caller can optionally pass the process's total holding counts, if
778  * known.  If NULL is passed then these values will be computed internally.
779  * ----------------------------
780  */
781 int
782 LockResolveConflicts(LOCKMETHOD lockmethod,
783                                          LOCKMODE lockmode,
784                                          LOCK *lock,
785                                          HOLDER *holder,
786                                          PROC *proc,
787                                          int *myHolding)                /* myHolding[] array or NULL */
788 {
789         LOCKMETHODCTL *lockctl = LockMethodTable[lockmethod]->ctl;
790         int                     numLockModes = lockctl->numLockModes;
791         int                     bitmask;
792         int                     i,
793                                 tmpMask;
794         int                     localHolding[MAX_LOCKMODES];
795
796         Assert((holder->nHolding >= 0) && (holder->holding[lockmode] >= 0));
797
798         /* ----------------------------
799          * first check for global conflicts: If no locks conflict
800          * with mine, then I get the lock.
801          *
802          * Checking for conflict: lock->grantMask represents the types of
803          * currently held locks.  conflictTable[lockmode] has a bit
804          * set for each type of lock that conflicts with mine.  Bitwise
805          * compare tells if there is a conflict.
806          * ----------------------------
807          */
808         if (!(lockctl->conflictTab[lockmode] & lock->grantMask))
809         {
810                 HOLDER_PRINT("LockResolveConflicts: no conflict", holder);
811                 return STATUS_OK;
812         }
813
814         /* ------------------------
815          * Rats.  Something conflicts. But it could still be my own
816          * lock.  We have to construct a conflict mask
817          * that does not reflect our own locks.  Locks held by the current
818          * process under another XID also count as "our own locks".
819          * ------------------------
820          */
821         if (myHolding == NULL)
822         {
823                 /* Caller didn't do calculation of total holding for me */
824                 LockCountMyLocks(holder->tag.lock, proc, localHolding);
825                 myHolding = localHolding;
826         }
827
828         /* Compute mask of lock types held by other processes */
829         bitmask = 0;
830         tmpMask = 2;
831         for (i = 1; i <= numLockModes; i++, tmpMask <<= 1)
832         {
833                 if (lock->granted[i] != myHolding[i])
834                         bitmask |= tmpMask;
835         }
836
837         /* ------------------------
838          * now check again for conflicts.  'bitmask' describes the types
839          * of locks held by other processes.  If one of these
840          * conflicts with the kind of lock that I want, there is a
841          * conflict and I have to sleep.
842          * ------------------------
843          */
844         if (!(lockctl->conflictTab[lockmode] & bitmask))
845         {
846                 /* no conflict. OK to get the lock */
847                 HOLDER_PRINT("LockResolveConflicts: resolved", holder);
848                 return STATUS_OK;
849         }
850
851         HOLDER_PRINT("LockResolveConflicts: conflicting", holder);
852         return STATUS_FOUND;
853 }
854
855 /*
856  * LockCountMyLocks --- Count total number of locks held on a given lockable
857  *              object by a given process (under any transaction ID).
858  *
859  * XXX This could be rather slow if the process holds a large number of locks.
860  * Perhaps it could be sped up if we kept yet a third hashtable of per-
861  * process lock information.  However, for the normal case where a transaction
862  * doesn't hold a large number of locks, keeping such a table would probably
863  * be a net slowdown.
864  */
865 static void
866 LockCountMyLocks(SHMEM_OFFSET lockOffset, PROC *proc, int *myHolding)
867 {
868         SHM_QUEUE  *procHolders = &(proc->procHolders);
869         HOLDER     *holder;
870         int                     i;
871
872         MemSet(myHolding, 0, MAX_LOCKMODES * sizeof(int));
873
874         holder = (HOLDER *) SHMQueueNext(procHolders, procHolders,
875                                                                          offsetof(HOLDER, procLink));
876
877         while (holder)
878         {
879                 if (lockOffset == holder->tag.lock)
880                 {
881                         for (i = 1; i < MAX_LOCKMODES; i++)
882                         {
883                                 myHolding[i] += holder->holding[i];
884                         }
885                 }
886
887                 holder = (HOLDER *) SHMQueueNext(procHolders, &holder->procLink,
888                                                                                  offsetof(HOLDER, procLink));
889         }
890 }
891
892 /*
893  * LockGetMyHeldLocks -- compute bitmask of lock types held by a process
894  *              for a given lockable object.
895  */
896 static int
897 LockGetMyHeldLocks(SHMEM_OFFSET lockOffset, PROC *proc)
898 {
899         int                     myHolding[MAX_LOCKMODES];
900         int                     heldLocks = 0;
901         int                     i,
902                                 tmpMask;
903
904         LockCountMyLocks(lockOffset, proc, myHolding);
905
906         for (i = 1, tmpMask = 2;
907                  i < MAX_LOCKMODES;
908                  i++, tmpMask <<= 1)
909         {
910                 if (myHolding[i] > 0)
911                         heldLocks |= tmpMask;
912         }
913         return heldLocks;
914 }
915
916 /*
917  * GrantLock -- update the lock and holder data structures to show
918  *              the lock request has been granted.
919  */
920 void
921 GrantLock(LOCK *lock, HOLDER *holder, LOCKMODE lockmode)
922 {
923         lock->nGranted++;
924         lock->granted[lockmode]++;
925         lock->grantMask |= BITS_ON[lockmode];
926         if (lock->granted[lockmode] == lock->requested[lockmode])
927                 lock->waitMask &= BITS_OFF[lockmode];
928         LOCK_PRINT("GrantLock", lock, lockmode);
929         Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
930         Assert(lock->nGranted <= lock->nRequested);
931         holder->holding[lockmode]++;
932         holder->nHolding++;
933         Assert((holder->nHolding > 0) && (holder->holding[lockmode] > 0));
934 }
935
936 /*
937  * WaitOnLock -- wait to acquire a lock
938  *
939  * The locktable spinlock must be held at entry.
940  */
941 static int
942 WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode,
943                    LOCK *lock, HOLDER *holder)
944 {
945         LOCKMETHODTABLE *lockMethodTable = LockMethodTable[lockmethod];
946         char       *new_status,
947                            *old_status;
948
949         Assert(lockmethod < NumLockMethods);
950
951         LOCK_PRINT("WaitOnLock: sleeping on lock", lock, lockmode);
952
953         old_status = pstrdup(get_ps_display());
954         new_status = (char *) palloc(strlen(old_status) + 10);
955         strcpy(new_status, old_status);
956         strcat(new_status, " waiting");
957         set_ps_display(new_status);
958
959         /*
960          * NOTE: Think not to put any lock state cleanup after the call to
961          * ProcSleep, in either the normal or failure path.  The lock state
962          * must be fully set by the lock grantor, or by HandleDeadLock if we
963          * give up waiting for the lock.  This is necessary because of the
964          * possibility that a cancel/die interrupt will interrupt ProcSleep
965          * after someone else grants us the lock, but before we've noticed it.
966          * Hence, after granting, the locktable state must fully reflect the
967          * fact that we own the lock; we can't do additional work on return.
968          */
969
970         if (ProcSleep(lockMethodTable->ctl,
971                                   lockmode,
972                                   lock,
973                                   holder) != NO_ERROR)
974         {
975                 /* -------------------
976                  * We failed as a result of a deadlock, see HandleDeadLock().
977                  * Quit now.  Removal of the holder and lock objects, if no longer
978                  * needed, will happen in xact cleanup (see above for motivation).
979                  * -------------------
980                  */
981                 LOCK_PRINT("WaitOnLock: aborting on lock", lock, lockmode);
982                 SpinRelease(lockMethodTable->ctl->masterLock);
983                 elog(ERROR, DeadLockMessage);
984                 /* not reached */
985         }
986
987         set_ps_display(old_status);
988         pfree(old_status);
989         pfree(new_status);
990
991         LOCK_PRINT("WaitOnLock: wakeup on lock", lock, lockmode);
992         return STATUS_OK;
993 }
994
995 /*
996  * LockRelease -- look up 'locktag' in lock table 'lockmethod' and
997  *              release it.
998  *
999  * Side Effects: if the lock no longer conflicts with the highest
1000  *              priority waiting process, that process is granted the lock
1001  *              and awoken. (We have to grant the lock here to avoid a
1002  *              race between the waking process and any new process to
1003  *              come along and request the lock.)
1004  */
1005 bool
1006 LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag,
1007                         TransactionId xid, LOCKMODE lockmode)
1008 {
1009         LOCK       *lock;
1010         SPINLOCK        masterLock;
1011         bool            found;
1012         LOCKMETHODTABLE *lockMethodTable;
1013         HOLDER     *holder;
1014         HOLDERTAG       holdertag;
1015         HTAB       *holderTable;
1016         bool            wakeupNeeded = true;
1017
1018 #ifdef LOCK_DEBUG
1019         if (lockmethod == USER_LOCKMETHOD && Trace_userlocks)
1020         elog(DEBUG, "LockRelease: user lock tag [%u] %d", locktag->objId.blkno, lockmode);
1021 #endif
1022
1023         /* ???????? This must be changed when short term locks will be used */
1024         locktag->lockmethod = lockmethod;
1025
1026         Assert(lockmethod < NumLockMethods);
1027         lockMethodTable = LockMethodTable[lockmethod];
1028         if (!lockMethodTable)
1029         {
1030                 elog(NOTICE, "lockMethodTable is null in LockRelease");
1031                 return FALSE;
1032         }
1033
1034         if (LockingIsDisabled)
1035                 return TRUE;
1036
1037         masterLock = lockMethodTable->ctl->masterLock;
1038         SpinAcquire(masterLock);
1039
1040         /*
1041          * Find a lock with this tag
1042          */
1043         Assert(lockMethodTable->lockHash->hash == tag_hash);
1044         lock = (LOCK *) hash_search(lockMethodTable->lockHash, (Pointer) locktag,
1045                                                                 HASH_FIND, &found);
1046
1047         /*
1048          * let the caller print its own error message, too. Do not
1049          * elog(ERROR).
1050          */
1051         if (!lock)
1052         {
1053                 SpinRelease(masterLock);
1054                 elog(NOTICE, "LockRelease: locktable corrupted");
1055                 return FALSE;
1056         }
1057
1058         if (!found)
1059         {
1060                 SpinRelease(masterLock);
1061                 elog(NOTICE, "LockRelease: no such lock");
1062                 return FALSE;
1063         }
1064         LOCK_PRINT("LockRelease: found", lock, lockmode);
1065
1066         /*
1067          * Find the holder entry for this holder.
1068          */
1069         MemSet(&holdertag, 0, sizeof(HOLDERTAG)); /* must clear padding, needed */
1070         holdertag.lock = MAKE_OFFSET(lock);
1071         holdertag.proc = MAKE_OFFSET(MyProc);
1072         TransactionIdStore(xid, &holdertag.xid);
1073
1074         holderTable = lockMethodTable->holderHash;
1075         holder = (HOLDER *) hash_search(holderTable, (Pointer) &holdertag,
1076                                                                         HASH_FIND_SAVE, &found);
1077         if (!holder || !found)
1078         {
1079                 SpinRelease(masterLock);
1080 #ifdef USER_LOCKS
1081                 if (!found && lockmethod == USER_LOCKMETHOD)
1082             elog(NOTICE, "LockRelease: no lock with this tag");
1083                 else
1084 #endif
1085                         elog(NOTICE, "LockRelease: holder table corrupted");
1086                 return FALSE;
1087         }
1088         HOLDER_PRINT("LockRelease: found", holder);
1089         Assert(holder->tag.lock == MAKE_OFFSET(lock));
1090
1091         /*
1092          * Check that we are actually holding a lock of the type we want to
1093          * release.
1094          */
1095         if (!(holder->holding[lockmode] > 0))
1096         {
1097                 SpinRelease(masterLock);
1098                 HOLDER_PRINT("LockRelease: WRONGTYPE", holder);
1099                 elog(NOTICE, "LockRelease: you don't own a lock of type %s",
1100                          lock_types[lockmode]);
1101                 Assert(holder->holding[lockmode] >= 0);
1102                 return FALSE;
1103         }
1104         Assert(holder->nHolding > 0);
1105         Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
1106         Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0));
1107         Assert(lock->nGranted <= lock->nRequested);
1108
1109         /*
1110          * fix the general lock stats
1111          */
1112         lock->nRequested--;
1113         lock->requested[lockmode]--;
1114         lock->nGranted--;
1115         lock->granted[lockmode]--;
1116
1117         if (lock->granted[lockmode] == 0)
1118         {
1119                 /* change the conflict mask.  No more of this lock type. */
1120                 lock->grantMask &= BITS_OFF[lockmode];
1121         }
1122
1123 #ifdef NOT_USED
1124         /* --------------------------
1125          * If there are still active locks of the type I just released, no one
1126          * should be woken up.  Whoever is asleep will still conflict
1127          * with the remaining locks.
1128          * --------------------------
1129          */
1130         if (lock->granted[lockmode])
1131                 wakeupNeeded = false;
1132         else
1133 #endif
1134
1135                 /*
1136                  * Above is not valid any more (due to MVCC lock modes). Actually
1137                  * we should compare granted[lockmode] with number of
1138                  * waiters holding lock of this type and try to wakeup only if
1139                  * these numbers are equal (and lock released conflicts with locks
1140                  * requested by waiters). For the moment we only check the last
1141                  * condition.
1142                  */
1143         if (lockMethodTable->ctl->conflictTab[lockmode] & lock->waitMask)
1144                 wakeupNeeded = true;
1145
1146         LOCK_PRINT("LockRelease: updated", lock, lockmode);
1147         Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
1148         Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
1149         Assert(lock->nGranted <= lock->nRequested);
1150
1151         if (lock->nRequested == 0)
1152         {
1153                 /* ------------------
1154                  * if there's no one waiting in the queue,
1155                  * we just released the last lock on this object.
1156                  * Delete it from the lock table.
1157                  * ------------------
1158                  */
1159                 Assert(lockMethodTable->lockHash->hash == tag_hash);
1160                 lock = (LOCK *) hash_search(lockMethodTable->lockHash,
1161                                                                         (Pointer) &(lock->tag),
1162                                                                         HASH_REMOVE,
1163                                                                         &found);
1164                 Assert(lock && found);
1165                 wakeupNeeded = false;
1166         }
1167
1168         /*
1169          * Now fix the per-holder lock stats.
1170          */
1171         holder->holding[lockmode]--;
1172         holder->nHolding--;
1173         HOLDER_PRINT("LockRelease: updated", holder);
1174         Assert((holder->nHolding >= 0) && (holder->holding[lockmode] >= 0));
1175
1176         /*
1177          * If this was my last hold on this lock, delete my entry in the holder
1178          * table.
1179          */
1180         if (holder->nHolding == 0)
1181         {
1182                 HOLDER_PRINT("LockRelease: deleting", holder);
1183                 SHMQueueDelete(&holder->lockLink);
1184                 SHMQueueDelete(&holder->procLink);
1185                 holder = (HOLDER *) hash_search(holderTable, (Pointer) &holder,
1186                                                                                 HASH_REMOVE_SAVED, &found);
1187                 if (!holder || !found)
1188                 {
1189                         SpinRelease(masterLock);
1190                         elog(NOTICE, "LockRelease: remove holder, table corrupted");
1191                         return FALSE;
1192                 }
1193         }
1194
1195         if (wakeupNeeded)
1196                 ProcLockWakeup(lockmethod, lock);
1197 #ifdef LOCK_DEBUG
1198         else if (LOCK_DEBUG_ENABLED(lock))
1199         elog(DEBUG, "LockRelease: no wakeup needed");
1200 #endif
1201
1202         SpinRelease(masterLock);
1203         return TRUE;
1204 }
1205
1206 /*
1207  * LockReleaseAll -- Release all locks in a process's lock list.
1208  *
1209  * Well, not really *all* locks.
1210  *
1211  * If 'allxids' is TRUE, all locks of the specified lock method are
1212  * released, regardless of transaction affiliation.
1213  *
1214  * If 'allxids' is FALSE, all locks of the specified lock method and
1215  * specified XID are released.
1216  */
1217 bool
1218 LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc,
1219                            bool allxids, TransactionId xid)
1220 {
1221         SHM_QUEUE  *procHolders = &(proc->procHolders);
1222         HOLDER     *holder;
1223         HOLDER     *nextHolder;
1224         SPINLOCK        masterLock;
1225         LOCKMETHODTABLE *lockMethodTable;
1226         int                     i,
1227                                 numLockModes;
1228         LOCK       *lock;
1229         bool            found;
1230
1231 #ifdef LOCK_DEBUG
1232         if (lockmethod == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
1233                 elog(DEBUG, "LockReleaseAll: lockmethod=%d, pid=%d",
1234                          lockmethod, proc->pid);
1235 #endif
1236
1237         Assert(lockmethod < NumLockMethods);
1238         lockMethodTable = LockMethodTable[lockmethod];
1239         if (!lockMethodTable)
1240         {
1241                 elog(NOTICE, "LockReleaseAll: bad lockmethod %d", lockmethod);
1242                 return FALSE;
1243         }
1244
1245         numLockModes = lockMethodTable->ctl->numLockModes;
1246         masterLock = lockMethodTable->ctl->masterLock;
1247
1248         SpinAcquire(masterLock);
1249
1250         holder = (HOLDER *) SHMQueueNext(procHolders, procHolders,
1251                                                                          offsetof(HOLDER, procLink));
1252
1253         while (holder)
1254         {
1255                 bool            wakeupNeeded = false;
1256
1257                 /* Get link first, since we may unlink/delete this holder */
1258                 nextHolder = (HOLDER *) SHMQueueNext(procHolders, &holder->procLink,
1259                                                                                          offsetof(HOLDER, procLink));
1260
1261                 Assert(holder->tag.proc == MAKE_OFFSET(proc));
1262
1263                 lock = (LOCK *) MAKE_PTR(holder->tag.lock);
1264
1265                 /* Ignore items that are not of the lockmethod to be removed */
1266                 if (LOCK_LOCKMETHOD(*lock) != lockmethod)
1267                         goto next_item;
1268
1269                 /* If not allxids, ignore items that are of the wrong xid */
1270                 if (!allxids && xid != holder->tag.xid)
1271                         goto next_item;
1272
1273                 HOLDER_PRINT("LockReleaseAll", holder);
1274                 LOCK_PRINT("LockReleaseAll", lock, 0);
1275                 Assert(lock->nRequested >= 0);
1276                 Assert(lock->nGranted >= 0);
1277                 Assert(lock->nGranted <= lock->nRequested);
1278                 Assert(holder->nHolding >= 0);
1279                 Assert(holder->nHolding <= lock->nRequested);
1280
1281                 /* ------------------
1282                  * fix the general lock stats
1283                  * ------------------
1284                  */
1285                 if (lock->nRequested != holder->nHolding)
1286                 {
1287                         for (i = 1; i <= numLockModes; i++)
1288                         {
1289                                 Assert(holder->holding[i] >= 0);
1290                                 if (holder->holding[i] > 0)
1291                                 {
1292                                         lock->requested[i] -= holder->holding[i];
1293                                         lock->granted[i] -= holder->holding[i];
1294                                         Assert(lock->requested[i] >= 0 && lock->granted[i] >= 0);
1295                                         if (lock->granted[i] == 0)
1296                                                 lock->grantMask &= BITS_OFF[i];
1297                                         /*
1298                                          * Read comments in LockRelease
1299                                          */
1300                                         if (!wakeupNeeded &&
1301                                                 lockMethodTable->ctl->conflictTab[i] & lock->waitMask)
1302                                                 wakeupNeeded = true;
1303                                 }
1304                         }
1305                         lock->nRequested -= holder->nHolding;
1306                         lock->nGranted -= holder->nHolding;
1307                         Assert((lock->nRequested >= 0) && (lock->nGranted >= 0));
1308                         Assert(lock->nGranted <= lock->nRequested);
1309                 }
1310                 else
1311                 {
1312                         /* --------------
1313                          * set nRequested to zero so that we can garbage collect the lock
1314                          * down below...
1315                          * --------------
1316                          */
1317                         lock->nRequested = 0;
1318                         lock->nGranted = 0;
1319                         /* Fix the lock status, just for next LOCK_PRINT message. */
1320                         for (i = 1; i <= numLockModes; i++)
1321                         {
1322                                 Assert(lock->requested[i] == lock->granted[i]);
1323                                 lock->requested[i] = lock->granted[i] = 0;
1324                         }
1325                 }
1326                 LOCK_PRINT("LockReleaseAll: updated", lock, 0);
1327
1328                 HOLDER_PRINT("LockReleaseAll: deleting", holder);
1329
1330                 /*
1331                  * Remove the holder entry from the linked lists
1332                  */
1333                 SHMQueueDelete(&holder->lockLink);
1334                 SHMQueueDelete(&holder->procLink);
1335
1336                 /*
1337                  * remove the holder entry from the hashtable
1338                  */
1339                 holder = (HOLDER *) hash_search(lockMethodTable->holderHash,
1340                                                                                 (Pointer) holder,
1341                                                                                 HASH_REMOVE,
1342                                                                                 &found);
1343                 if (!holder || !found)
1344                 {
1345                         SpinRelease(masterLock);
1346                         elog(NOTICE, "LockReleaseAll: holder table corrupted");
1347                         return FALSE;
1348                 }
1349
1350                 if (!lock->nRequested)
1351                 {
1352                         /* --------------------
1353                          * We've just released the last lock, so garbage-collect the
1354                          * lock object.
1355                          * --------------------
1356                          */
1357                         LOCK_PRINT("LockReleaseAll: deleting", lock, 0);
1358                         Assert(lockMethodTable->lockHash->hash == tag_hash);
1359                         lock = (LOCK *) hash_search(lockMethodTable->lockHash,
1360                                                                                 (Pointer) &(lock->tag),
1361                                                                                 HASH_REMOVE, &found);
1362                         if ((!lock) || (!found))
1363                         {
1364                                 SpinRelease(masterLock);
1365                                 elog(NOTICE, "LockReleaseAll: cannot remove lock from HTAB");
1366                                 return FALSE;
1367                         }
1368                 }
1369                 else if (wakeupNeeded)
1370                         ProcLockWakeup(lockmethod, lock);
1371
1372 next_item:
1373                 holder = nextHolder;
1374         }
1375
1376         SpinRelease(masterLock);
1377
1378 #ifdef LOCK_DEBUG
1379     if (lockmethod == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
1380         elog(DEBUG, "LockReleaseAll: done");
1381 #endif
1382
1383         return TRUE;
1384 }
1385
1386 int
1387 LockShmemSize(int maxBackends)
1388 {
1389         int                     size = 0;
1390
1391         size += MAXALIGN(sizeof(PROC_HDR)); /* ProcGlobal */
1392         size += MAXALIGN(maxBackends * sizeof(PROC));           /* each MyProc */
1393         size += MAXALIGN(maxBackends * sizeof(LOCKMETHODCTL));          /* each
1394                                                                                                                                  * lockMethodTable->ctl */
1395
1396         /* lockHash table */
1397         size += hash_estimate_size(NLOCKENTS(maxBackends),
1398                                                            SHMEM_LOCKTAB_KEYSIZE,
1399                                                            SHMEM_LOCKTAB_DATASIZE);
1400
1401         /* holderHash table */
1402         size += hash_estimate_size(NLOCKENTS(maxBackends),
1403                                                            SHMEM_HOLDERTAB_KEYSIZE,
1404                                                            SHMEM_HOLDERTAB_DATASIZE);
1405
1406         /*
1407          * Since the lockHash entry count above is only an estimate, add 10%
1408          * safety margin.
1409          */
1410         size += size / 10;
1411
1412         return size;
1413 }
1414
1415 /*
1416  * DeadLockCheck -- Checks for deadlocks for a given process
1417  *
1418  * This code takes a list of locks a process holds, and the lock that
1419  * the process is sleeping on, and tries to find if any of the processes
1420  * waiting on its locks hold the lock it is waiting for.  If no deadlock
1421  * is found, it goes on to look at all the processes waiting on their locks.
1422  *
1423  * We can't block on user locks, so no sense testing for deadlock
1424  * because there is no blocking, and no timer for the block.  So,
1425  * only look at regular locks.
1426  *
1427  * We have already locked the master lock before being called.
1428  */
1429 bool
1430 DeadLockCheck(PROC *thisProc, LOCK *findlock)
1431 {
1432         PROC       *waitProc;
1433         PROC_QUEUE *waitQueue;
1434         SHM_QUEUE  *procHolders = &(thisProc->procHolders);
1435         HOLDER     *holder;
1436         HOLDER     *nextHolder;
1437         LOCKMETHODCTL *lockctl = LockMethodTable[DEFAULT_LOCKMETHOD]->ctl;
1438         LOCK       *lock;
1439         int                     i,
1440                                 j;
1441         bool            first_run = (thisProc == MyProc);
1442
1443         static PROC *checked_procs[MAXBACKENDS];
1444         static int      nprocs;
1445
1446         /* initialize at start of recursion */
1447         if (first_run)
1448         {
1449                 checked_procs[0] = thisProc;
1450                 nprocs = 1;
1451         }
1452
1453         /*
1454          * Scan over all the locks held/awaited by thisProc.
1455          */
1456         holder = (HOLDER *) SHMQueueNext(procHolders, procHolders,
1457                                                                          offsetof(HOLDER, procLink));
1458
1459         while (holder)
1460         {
1461                 /* Get link first, since we may unlink/delete this holder */
1462                 nextHolder = (HOLDER *) SHMQueueNext(procHolders, &holder->procLink,
1463                                                                                          offsetof(HOLDER, procLink));
1464
1465                 Assert(holder->tag.proc == MAKE_OFFSET(thisProc));
1466
1467                 lock = (LOCK *) MAKE_PTR(holder->tag.lock);
1468
1469                 /* Ignore user locks */
1470                 if (lock->tag.lockmethod != DEFAULT_LOCKMETHOD)
1471                         goto nxtl;
1472
1473                 HOLDER_PRINT("DeadLockCheck", holder);
1474                 LOCK_PRINT("DeadLockCheck", lock, 0);
1475
1476                 /*
1477                  * waitLock is always in procHolders of waiting proc, if !first_run
1478                  * then upper caller will handle waitProcs queue of waitLock.
1479                  */
1480                 if (thisProc->waitLock == lock && !first_run)
1481                         goto nxtl;
1482
1483                 /*
1484                  * If we found proc holding findlock and sleeping on some my other
1485                  * lock then we have to check does it block me or another waiters.
1486                  */
1487                 if (lock == findlock && !first_run)
1488                 {
1489                         int                     lm;
1490
1491                         Assert(holder->nHolding > 0);
1492                         for (lm = 1; lm <= lockctl->numLockModes; lm++)
1493                         {
1494                                 if (holder->holding[lm] > 0 &&
1495                                         lockctl->conflictTab[lm] & findlock->waitMask)
1496                                         return true;
1497                         }
1498
1499                         /*
1500                          * Else - get the next lock from thisProc's procHolders
1501                          */
1502                         goto nxtl;
1503                 }
1504
1505                 waitQueue = &(lock->waitProcs);
1506                 waitProc = (PROC *) MAKE_PTR(waitQueue->links.next);
1507
1508                 /*
1509                  * Inner loop scans over all processes waiting for this lock.
1510                  *
1511                  * NOTE: loop must count down because we want to examine each item
1512                  * in the queue even if waitQueue->size decreases due to waking up
1513                  * some of the processes.
1514                  */
1515                 for (i = waitQueue->size; --i >= 0; )
1516                 {
1517                         Assert(waitProc->waitLock == lock);
1518                         if (waitProc == thisProc)
1519                         {
1520                                 /* This should only happen at first level */
1521                                 Assert(waitProc == MyProc);
1522                                 goto nextWaitProc;
1523                         }
1524                         if (lock == findlock)           /* first_run also true */
1525                         {
1526                                 /*
1527                                  * If I'm blocked by his heldLocks...
1528                                  */
1529                                 if (lockctl->conflictTab[MyProc->waitLockMode] & waitProc->heldLocks)
1530                                 {
1531                                         /* and he blocked by me -> deadlock */
1532                                         if (lockctl->conflictTab[waitProc->waitLockMode] & MyProc->heldLocks)
1533                                                 return true;
1534                                         /* we shouldn't look at procHolders of our blockers */
1535                                         goto nextWaitProc;
1536                                 }
1537
1538                                 /*
1539                                  * If he isn't blocked by me and we request
1540                                  * non-conflicting lock modes - no deadlock here because
1541                                  * he isn't blocked by me in any sense (explicitly or
1542                                  * implicitly). Note that we don't do like test if
1543                                  * !first_run (when thisProc is holder and non-waiter on
1544                                  * lock) and so we call DeadLockCheck below for every
1545                                  * waitProc in thisProc->procHolders, even for waitProc-s
1546                                  * un-blocked by thisProc. Should we? This could save us
1547                                  * some time...
1548                                  */
1549                                 if (!(lockctl->conflictTab[waitProc->waitLockMode] & MyProc->heldLocks) &&
1550                                         !(lockctl->conflictTab[waitProc->waitLockMode] & (1 << MyProc->waitLockMode)))
1551                                         goto nextWaitProc;
1552                         }
1553
1554                         /*
1555                          * Skip this waiter if already checked.
1556                          */
1557                         for (j = 0; j < nprocs; j++)
1558                         {
1559                                 if (checked_procs[j] == waitProc)
1560                                         goto nextWaitProc;
1561                         }
1562
1563                         /* Recursively check this process's procHolders. */
1564                         Assert(nprocs < MAXBACKENDS);
1565                         checked_procs[nprocs++] = waitProc;
1566
1567                         if (DeadLockCheck(waitProc, findlock))
1568                         {
1569                                 int                     heldLocks;
1570
1571                                 /*
1572                                  * Ok, but is waitProc waiting for me (thisProc) ?
1573                                  */
1574                                 if (thisProc->waitLock == lock)
1575                                 {
1576                                         Assert(first_run);
1577                                         heldLocks = thisProc->heldLocks;
1578                                 }
1579                                 else
1580                                 {
1581                                         /* should we cache heldLocks to speed this up? */
1582                                         heldLocks = LockGetMyHeldLocks(holder->tag.lock, thisProc);
1583                                         Assert(heldLocks != 0);
1584                                 }
1585                                 if (lockctl->conflictTab[waitProc->waitLockMode] & heldLocks)
1586                                 {
1587                                         /*
1588                                          * Last attempt to avoid deadlock: try to wakeup myself.
1589                                          */
1590                                         if (first_run)
1591                                         {
1592                                                 if (LockResolveConflicts(DEFAULT_LOCKMETHOD,
1593                                                                                                  MyProc->waitLockMode,
1594                                                                                                  MyProc->waitLock,
1595                                                                                                  MyProc->waitHolder,
1596                                                                                                  MyProc,
1597                                                                                                  NULL) == STATUS_OK)
1598                                                 {
1599                                                         GrantLock(MyProc->waitLock,
1600                                                                           MyProc->waitHolder,
1601                                                                           MyProc->waitLockMode);
1602                                                         ProcWakeup(MyProc, NO_ERROR);
1603                                                         return false;
1604                                                 }
1605                                         }
1606                                         return true;
1607                                 }
1608
1609                                 /*
1610                                  * Hell! Is he blocked by any (other) holder ?
1611                                  */
1612                                 if (LockResolveConflicts(DEFAULT_LOCKMETHOD,
1613                                                                                  waitProc->waitLockMode,
1614                                                                                  lock,
1615                                                                                  waitProc->waitHolder,
1616                                                                                  waitProc,
1617                                                                                  NULL) != STATUS_OK)
1618                                 {
1619                                         /*
1620                                          * Blocked by others - no deadlock...
1621                                          */
1622                                         LOCK_PRINT("DeadLockCheck: blocked by others",
1623                                                            lock, waitProc->waitLockMode);
1624                                         goto nextWaitProc;
1625                                 }
1626
1627                                 /*
1628                                  * Well - wakeup this guy! This is the case of
1629                                  * implicit blocking: thisProc blocked someone who
1630                                  * blocked waitProc by the fact that he/someone is
1631                                  * already waiting for lock.  We do this for
1632                                  * anti-starving.
1633                                  */
1634                                 GrantLock(lock, waitProc->waitHolder, waitProc->waitLockMode);
1635                                 waitProc = ProcWakeup(waitProc, NO_ERROR);
1636                                 /*
1637                                  * Use next-proc link returned by ProcWakeup, since this
1638                                  * proc's own links field is now cleared.
1639                                  */
1640                                 continue;
1641                         }
1642
1643 nextWaitProc:
1644                         waitProc = (PROC *) MAKE_PTR(waitProc->links.next);
1645                 }
1646
1647 nxtl:
1648                 holder = nextHolder;
1649         }
1650
1651         /* if we got here, no deadlock */
1652         return false;
1653 }
1654
1655 #ifdef LOCK_DEBUG
1656 /*
1657  * Dump all locks in the proc->procHolders list.
1658  *
1659  * Must have already acquired the masterLock.
1660  */
1661 void
1662 DumpLocks(void)
1663 {
1664         SHMEM_OFFSET location;
1665         PROC       *proc;
1666         SHM_QUEUE  *procHolders;
1667         HOLDER     *holder;
1668         LOCK       *lock;
1669         int                     lockmethod = DEFAULT_LOCKMETHOD;
1670         LOCKMETHODTABLE *lockMethodTable;
1671
1672         ShmemPIDLookup(MyProcPid, &location);
1673         if (location == INVALID_OFFSET)
1674                 return;
1675         proc = (PROC *) MAKE_PTR(location);
1676         if (proc != MyProc)
1677                 return;
1678         procHolders = &proc->procHolders;
1679
1680         Assert(lockmethod < NumLockMethods);
1681         lockMethodTable = LockMethodTable[lockmethod];
1682         if (!lockMethodTable)
1683                 return;
1684
1685         if (proc->waitLock)
1686                 LOCK_PRINT("DumpLocks: waiting on", proc->waitLock, 0);
1687
1688         holder = (HOLDER *) SHMQueueNext(procHolders, procHolders,
1689                                                                          offsetof(HOLDER, procLink));
1690
1691         while (holder)
1692         {
1693                 Assert(holder->tag.proc == MAKE_OFFSET(proc));
1694
1695                 lock = (LOCK *) MAKE_PTR(holder->tag.lock);
1696
1697                 HOLDER_PRINT("DumpLocks", holder);
1698                 LOCK_PRINT("DumpLocks", lock, 0);
1699
1700                 holder = (HOLDER *) SHMQueueNext(procHolders, &holder->procLink,
1701                                                                                  offsetof(HOLDER, procLink));
1702         }
1703 }
1704
1705 /*
1706  * Dump all postgres locks. Must have already acquired the masterLock.
1707  */
1708 void
1709 DumpAllLocks(void)
1710 {
1711         SHMEM_OFFSET location;
1712         PROC       *proc;
1713         HOLDER     *holder = NULL;
1714         LOCK       *lock;
1715         int                     pid;
1716         int                     lockmethod = DEFAULT_LOCKMETHOD;
1717         LOCKMETHODTABLE *lockMethodTable;
1718         HTAB       *holderTable;
1719         HASH_SEQ_STATUS status;
1720
1721         pid = getpid();
1722         ShmemPIDLookup(pid, &location);
1723         if (location == INVALID_OFFSET)
1724                 return;
1725         proc = (PROC *) MAKE_PTR(location);
1726         if (proc != MyProc)
1727                 return;
1728
1729         Assert(lockmethod < NumLockMethods);
1730         lockMethodTable = LockMethodTable[lockmethod];
1731         if (!lockMethodTable)
1732                 return;
1733
1734         holderTable = lockMethodTable->holderHash;
1735
1736         if (proc->waitLock)
1737                 LOCK_PRINT("DumpAllLocks: waiting on", proc->waitLock, 0);
1738
1739         hash_seq_init(&status, holderTable);
1740         while ((holder = (HOLDER *) hash_seq_search(&status)) &&
1741                    (holder != (HOLDER *) TRUE))
1742         {
1743                 HOLDER_PRINT("DumpAllLocks", holder);
1744
1745                 if (holder->tag.lock)
1746                 {
1747                         lock = (LOCK *) MAKE_PTR(holder->tag.lock);
1748                         LOCK_PRINT("DumpAllLocks", lock, 0);
1749                 }
1750                 else
1751                         elog(DEBUG, "DumpAllLocks: holder->tag.lock = NULL");
1752         }
1753 }
1754
1755 #endif /* LOCK_DEBUG */