]> granicus.if.org Git - postgresql/blob - src/backend/storage/lmgr/lock.c
Clean up non-reentrant interface for hash_seq/HashTableWalk, so that
[postgresql] / src / backend / storage / lmgr / lock.c
1 /*-------------------------------------------------------------------------
2  *
3  * lock.c
4  *        POSTGRES low-level lock mechanism
5  *
6  * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  *
10  * IDENTIFICATION
11  *        $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.75 2001/01/02 04:33:16 tgl Exp $
12  *
13  * NOTES
14  *        Outside modules can create a lock table and acquire/release
15  *        locks.  A lock table is a shared memory hash table.  When
16  *        a process tries to acquire a lock of a type that conflicts
17  *        with existing locks, it is put to sleep using the routines
18  *        in storage/lmgr/proc.c.
19  *
20  *        For the most part, this code should be invoked via lmgr.c
21  *        or another lock-management module, not directly.
22  *
23  *      Interface:
24  *
25  *      LockAcquire(), LockRelease(), LockMethodTableInit(),
26  *      LockMethodTableRename(), LockReleaseAll,
27  *      LockResolveConflicts(), GrantLock()
28  *
29  *-------------------------------------------------------------------------
30  */
31 #include <sys/types.h>
32 #include <unistd.h>
33 #include <signal.h>
34
35 #include "postgres.h"
36
37 #include "access/xact.h"
38 #include "miscadmin.h"
39 #include "storage/proc.h"
40 #include "utils/memutils.h"
41 #include "utils/ps_status.h"
42
43 static int      WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode,
44                                            LOCK *lock, HOLDER *holder);
45 static void LockCountMyLocks(SHMEM_OFFSET lockOffset, PROC *proc,
46                                                          int *myHolders);
47 static int LockGetMyHoldLocks(SHMEM_OFFSET lockOffset, PROC *proc);
48
49 static char *lock_types[] =
50 {
51         "INVALID",
52         "AccessShareLock",
53         "RowShareLock",
54         "RowExclusiveLock",
55         "ShareLock",
56         "ShareRowExclusiveLock",
57         "ExclusiveLock",
58         "AccessExclusiveLock"
59 };
60
61 static char *DeadLockMessage = "Deadlock detected.\n\tSee the lock(l) manual page for a possible cause.";
62
63
64 #ifdef LOCK_DEBUG
65
66 /*------
67  * The following configuration options are available for lock debugging:
68  *
69  *     trace_locks      -- give a bunch of output what's going on in this file
70  *     trace_userlocks  -- same but for user locks
71  *     trace_lock_oidmin-- do not trace locks for tables below this oid
72  *                         (use to avoid output on system tables)
73  *     trace_lock_table -- trace locks on this table (oid) unconditionally
74  *     debug_deadlocks  -- currently dumps locks at untimely occasions ;)
75  * Furthermore, but in storage/ipc/spin.c:
76  *     trace_spinlocks  -- trace spinlocks (pretty useless)
77  *
78  * Define LOCK_DEBUG at compile time to get all this enabled.
79  */
80
81 int  Trace_lock_oidmin  = BootstrapObjectIdData;
82 bool Trace_locks        = false;
83 bool Trace_userlocks    = false;
84 int  Trace_lock_table   = 0;
85 bool Debug_deadlocks    = false;
86
87
88 inline static bool
89 LOCK_DEBUG_ENABLED(const LOCK * lock)
90 {
91     return
92         (((LOCK_LOCKMETHOD(*lock) == DEFAULT_LOCKMETHOD && Trace_locks)
93           || (LOCK_LOCKMETHOD(*lock) == USER_LOCKMETHOD && Trace_userlocks))
94          && (lock->tag.relId >= Trace_lock_oidmin))
95         || (Trace_lock_table && (lock->tag.relId == Trace_lock_table));
96 }
97
98
99 inline static void
100 LOCK_PRINT(const char * where, const LOCK * lock, LOCKMODE type)
101 {
102         if (LOCK_DEBUG_ENABLED(lock))
103         elog(DEBUG,
104              "%s: lock(%lx) tbl(%d) rel(%u) db(%u) obj(%u) mask(%x) "
105              "hold(%d,%d,%d,%d,%d,%d,%d)=%d "
106              "act(%d,%d,%d,%d,%d,%d,%d)=%d wait(%d) type(%s)",
107              where, MAKE_OFFSET(lock),
108              lock->tag.lockmethod, lock->tag.relId, lock->tag.dbId,
109              lock->tag.objId.blkno, lock->mask,
110              lock->holders[1], lock->holders[2], lock->holders[3], lock->holders[4],
111              lock->holders[5], lock->holders[6], lock->holders[7], lock->nHolding,
112              lock->activeHolders[1], lock->activeHolders[2], lock->activeHolders[3],
113              lock->activeHolders[4], lock->activeHolders[5], lock->activeHolders[6],
114              lock->activeHolders[7], lock->nActive,
115              lock->waitProcs.size, lock_types[type]);
116 }
117
118
119 inline static void
120 HOLDER_PRINT(const char * where, const HOLDER * holderP)
121 {
122         if (
123         (((HOLDER_LOCKMETHOD(*holderP) == DEFAULT_LOCKMETHOD && Trace_locks)
124           || (HOLDER_LOCKMETHOD(*holderP) == USER_LOCKMETHOD && Trace_userlocks))
125          && (((LOCK *)MAKE_PTR(holderP->tag.lock))->tag.relId >= Trace_lock_oidmin))
126                 || (Trace_lock_table && (((LOCK *)MAKE_PTR(holderP->tag.lock))->tag.relId == Trace_lock_table))
127         )
128         elog(DEBUG,
129              "%s: holder(%lx) lock(%lx) tbl(%d) pid(%d) xid(%u) hold(%d,%d,%d,%d,%d,%d,%d)=%d",
130              where, MAKE_OFFSET(holderP), holderP->tag.lock,
131                          HOLDER_LOCKMETHOD(*(holderP)),
132              holderP->tag.pid, holderP->tag.xid,
133              holderP->holders[1], holderP->holders[2], holderP->holders[3], holderP->holders[4],
134              holderP->holders[5], holderP->holders[6], holderP->holders[7], holderP->nHolding);
135 }
136
137 #else  /* not LOCK_DEBUG */
138
139 #define LOCK_PRINT(where, lock, type)
140 #define HOLDER_PRINT(where, holderP)
141
142 #endif /* not LOCK_DEBUG */
143
144
145
146 SPINLOCK        LockMgrLock;            /* in Shmem or created in
147                                                                  * CreateSpinlocks() */
148
149 /* This is to simplify/speed up some bit arithmetic */
150
151 static LOCKMASK BITS_OFF[MAX_LOCKMODES];
152 static LOCKMASK BITS_ON[MAX_LOCKMODES];
153
154 /* -----------------
155  * Disable flag
156  * -----------------
157  */
158 static bool LockingIsDisabled;
159
160 /* -------------------
161  * map from lockmethod to the lock table structure
162  * -------------------
163  */
164 static LOCKMETHODTABLE *LockMethodTable[MAX_LOCK_METHODS];
165
166 static int      NumLockMethods;
167
168 /* -------------------
169  * InitLocks -- Init the lock module.  Create a private data
170  *              structure for constructing conflict masks.
171  * -------------------
172  */
173 void
174 InitLocks(void)
175 {
176         int                     i;
177         int                     bit;
178
179         bit = 1;
180         for (i = 0; i < MAX_LOCKMODES; i++, bit <<= 1)
181         {
182                 BITS_ON[i] = bit;
183                 BITS_OFF[i] = ~bit;
184         }
185 }
186
187 /* -------------------
188  * LockDisable -- sets LockingIsDisabled flag to TRUE or FALSE.
189  * ------------------
190  */
191 void
192 LockDisable(bool status)
193 {
194         LockingIsDisabled = status;
195 }
196
197 /* -----------------
198  * Boolean function to determine current locking status
199  * -----------------
200  */
201 bool
202 LockingDisabled(void)
203 {
204         return LockingIsDisabled;
205 }
206
207
208 /*
209  * LockMethodInit -- initialize the lock table's lock type
210  *              structures
211  *
212  * Notes: just copying.  Should only be called once.
213  */
214 static void
215 LockMethodInit(LOCKMETHODTABLE *lockMethodTable,
216                            LOCKMASK *conflictsP,
217                            int *prioP,
218                            int numModes)
219 {
220         int                     i;
221
222         lockMethodTable->ctl->numLockModes = numModes;
223         numModes++;
224         for (i = 0; i < numModes; i++, prioP++, conflictsP++)
225         {
226                 lockMethodTable->ctl->conflictTab[i] = *conflictsP;
227                 lockMethodTable->ctl->prio[i] = *prioP;
228         }
229 }
230
231 /*
232  * LockMethodTableInit -- initialize a lock table structure
233  *
234  * Notes:
235  *              (a) a lock table has four separate entries in the shmem index
236  *              table.  This is because every shared hash table and spinlock
237  *              has its name stored in the shmem index at its creation.  It
238  *              is wasteful, in this case, but not much space is involved.
239  *
240  * NOTE: data structures allocated here are allocated permanently, using
241  * TopMemoryContext and shared memory.  We don't ever release them anyway,
242  * and in normal multi-backend operation the lock table structures set up
243  * by the postmaster are inherited by each backend, so they must be in
244  * TopMemoryContext.
245  */
246 LOCKMETHOD
247 LockMethodTableInit(char *tabName,
248                                         LOCKMASK *conflictsP,
249                                         int *prioP,
250                                         int numModes,
251                                         int maxBackends)
252 {
253         LOCKMETHODTABLE *lockMethodTable;
254         char       *shmemName;
255         HASHCTL         info;
256         int                     hash_flags;
257         bool            found;
258         long            init_table_size,
259                                 max_table_size;
260
261         if (numModes > MAX_LOCKMODES)
262         {
263                 elog(NOTICE, "LockMethodTableInit: too many lock types %d greater than %d",
264                          numModes, MAX_LOCKMODES);
265                 return INVALID_LOCKMETHOD;
266         }
267
268         /* Compute init/max size to request for lock hashtables */
269         max_table_size = NLOCKENTS(maxBackends);
270         init_table_size = max_table_size / 10;
271
272         /* Allocate a string for the shmem index table lookups. */
273         /* This is just temp space in this routine, so palloc is OK. */
274         shmemName = (char *) palloc(strlen(tabName) + 32);
275
276         /* each lock table has a non-shared, permanent header */
277         lockMethodTable = (LOCKMETHODTABLE *)
278                 MemoryContextAlloc(TopMemoryContext, sizeof(LOCKMETHODTABLE));
279
280         /* ------------------------
281          * find/acquire the spinlock for the table
282          * ------------------------
283          */
284         SpinAcquire(LockMgrLock);
285
286         /* -----------------------
287          * allocate a control structure from shared memory or attach to it
288          * if it already exists.
289          * -----------------------
290          */
291         sprintf(shmemName, "%s (ctl)", tabName);
292         lockMethodTable->ctl = (LOCKMETHODCTL *)
293                 ShmemInitStruct(shmemName, sizeof(LOCKMETHODCTL), &found);
294
295         if (!lockMethodTable->ctl)
296                 elog(FATAL, "LockMethodTableInit: couldn't initialize %s", tabName);
297
298         /* -------------------
299          * no zero-th table
300          * -------------------
301          */
302         NumLockMethods = 1;
303
304         /* ----------------
305          * we're first - initialize
306          * ----------------
307          */
308         if (!found)
309         {
310                 MemSet(lockMethodTable->ctl, 0, sizeof(LOCKMETHODCTL));
311                 lockMethodTable->ctl->masterLock = LockMgrLock;
312                 lockMethodTable->ctl->lockmethod = NumLockMethods;
313         }
314
315         /* --------------------
316          * other modules refer to the lock table by a lockmethod ID
317          * --------------------
318          */
319         LockMethodTable[NumLockMethods] = lockMethodTable;
320         NumLockMethods++;
321         Assert(NumLockMethods <= MAX_LOCK_METHODS);
322
323         /* ----------------------
324          * allocate a hash table for LOCK structs.  This is used
325          * to store per-locked-object information.
326          * ----------------------
327          */
328         info.keysize = SHMEM_LOCKTAB_KEYSIZE;
329         info.datasize = SHMEM_LOCKTAB_DATASIZE;
330         info.hash = tag_hash;
331         hash_flags = (HASH_ELEM | HASH_FUNCTION);
332
333         sprintf(shmemName, "%s (lock hash)", tabName);
334         lockMethodTable->lockHash = ShmemInitHash(shmemName,
335                                                                                           init_table_size,
336                                                                                           max_table_size,
337                                                                                           &info,
338                                                                                           hash_flags);
339
340         if (!lockMethodTable->lockHash)
341                 elog(FATAL, "LockMethodTableInit: couldn't initialize %s", tabName);
342         Assert(lockMethodTable->lockHash->hash == tag_hash);
343
344         /* -------------------------
345          * allocate a hash table for HOLDER structs.  This is used
346          * to store per-lock-holder information.
347          * -------------------------
348          */
349         info.keysize = SHMEM_HOLDERTAB_KEYSIZE;
350         info.datasize = SHMEM_HOLDERTAB_DATASIZE;
351         info.hash = tag_hash;
352         hash_flags = (HASH_ELEM | HASH_FUNCTION);
353
354         sprintf(shmemName, "%s (holder hash)", tabName);
355         lockMethodTable->holderHash = ShmemInitHash(shmemName,
356                                                                                                 init_table_size,
357                                                                                                 max_table_size,
358                                                                                                 &info,
359                                                                                                 hash_flags);
360
361         if (!lockMethodTable->holderHash)
362                 elog(FATAL, "LockMethodTableInit: couldn't initialize %s", tabName);
363
364         /* init ctl data structures */
365         LockMethodInit(lockMethodTable, conflictsP, prioP, numModes);
366
367         SpinRelease(LockMgrLock);
368
369         pfree(shmemName);
370
371         return lockMethodTable->ctl->lockmethod;
372 }
373
374 /*
375  * LockMethodTableRename -- allocate another lockmethod ID to the same
376  *              lock table.
377  *
378  * NOTES: Both the lock module and the lock chain (lchain.c)
379  *              module use table id's to distinguish between different
380  *              kinds of locks.  Short term and long term locks look
381  *              the same to the lock table, but are handled differently
382  *              by the lock chain manager.      This function allows the
383  *              client to use different lockmethods when acquiring/releasing
384  *              short term and long term locks, yet store them all in one hashtable.
385  */
386
387 LOCKMETHOD
388 LockMethodTableRename(LOCKMETHOD lockmethod)
389 {
390         LOCKMETHOD      newLockMethod;
391
392         if (NumLockMethods >= MAX_LOCK_METHODS)
393                 return INVALID_LOCKMETHOD;
394         if (LockMethodTable[lockmethod] == INVALID_LOCKMETHOD)
395                 return INVALID_LOCKMETHOD;
396
397         /* other modules refer to the lock table by a lockmethod ID */
398         newLockMethod = NumLockMethods;
399         NumLockMethods++;
400
401         LockMethodTable[newLockMethod] = LockMethodTable[lockmethod];
402         return newLockMethod;
403 }
404
405 /*
406  * LockAcquire -- Check for lock conflicts, sleep if conflict found,
407  *              set lock if/when no conflicts.
408  *
409  * Returns: TRUE if parameters are correct, FALSE otherwise.
410  *
411  * Side Effects: The lock is always acquired.  No way to abort
412  *              a lock acquisition other than aborting the transaction.
413  *              Lock is recorded in the lkchain.
414  *
415  *
416  * Note on User Locks:
417  *
418  *              User locks are handled totally on the application side as
419  *              long term cooperative locks which extend beyond the normal
420  *              transaction boundaries.  Their purpose is to indicate to an
421  *              application that someone is `working' on an item.  So it is
422  *              possible to put an user lock on a tuple's oid, retrieve the
423  *              tuple, work on it for an hour and then update it and remove
424  *              the lock.  While the lock is active other clients can still
425  *              read and write the tuple but they can be aware that it has
426  *              been locked at the application level by someone.
427  *              User locks use lock tags made of an uint16 and an uint32, for
428  *              example 0 and a tuple oid, or any other arbitrary pair of
429  *              numbers following a convention established by the application.
430  *              In this sense tags don't refer to tuples or database entities.
431  *              User locks and normal locks are completely orthogonal and
432  *              they don't interfere with each other, so it is possible
433  *              to acquire a normal lock on an user-locked tuple or user-lock
434  *              a tuple for which a normal write lock already exists.
435  *              User locks are always non blocking, therefore they are never
436  *              acquired if already held by another process.  They must be
437  *              released explicitly by the application but they are released
438  *              automatically when a backend terminates.
439  *              They are indicated by a lockmethod 2 which is an alias for the
440  *              normal lock table, and are distinguished from normal locks
441  *              by the following differences:
442  *
443  *                                                                              normal lock             user lock
444  *
445  *              lockmethod                                              1                               2
446  *              tag.dbId                                                database oid    database oid
447  *              tag.relId                                               rel oid or 0    0
448  *              tag.objId                                               block id                lock id2
449  *                                                                              or xact id
450  *              tag.offnum                                              0                               lock id1
451  *              xid.pid                                                 backend pid             backend pid
452  *              xid.xid                                                 xid or 0                0
453  *              persistence                                             transaction             user or backend
454  *                                                                              or backend
455  *
456  *              The lockmode parameter can have the same values for normal locks
457  *              although probably only WRITE_LOCK can have some practical use.
458  *
459  *                                                                                                              DZ - 22 Nov 1997
460  */
461
462 bool
463 LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
464                         TransactionId xid, LOCKMODE lockmode)
465 {
466         HOLDER     *holder;
467         HOLDERTAG       holdertag;
468         HTAB       *holderTable;
469         bool            found;
470         LOCK       *lock;
471         SPINLOCK        masterLock;
472         LOCKMETHODTABLE *lockMethodTable;
473         int                     status;
474         int                     myHolders[MAX_LOCKMODES];
475         int                     i;
476
477 #ifdef LOCK_DEBUG
478         if (lockmethod == USER_LOCKMETHOD && Trace_userlocks)
479                 elog(DEBUG, "LockAcquire: user lock [%u] %s",
480                          locktag->objId.blkno, lock_types[lockmode]);
481 #endif
482
483         /* ???????? This must be changed when short term locks will be used */
484         locktag->lockmethod = lockmethod;
485
486         Assert(lockmethod < NumLockMethods);
487         lockMethodTable = LockMethodTable[lockmethod];
488         if (!lockMethodTable)
489         {
490                 elog(NOTICE, "LockAcquire: bad lock table %d", lockmethod);
491                 return FALSE;
492         }
493
494         if (LockingIsDisabled)
495                 return TRUE;
496
497         masterLock = lockMethodTable->ctl->masterLock;
498
499         SpinAcquire(masterLock);
500
501         /*
502          * Find or create a lock with this tag
503          */
504         Assert(lockMethodTable->lockHash->hash == tag_hash);
505         lock = (LOCK *) hash_search(lockMethodTable->lockHash, (Pointer) locktag,
506                                                                 HASH_ENTER, &found);
507         if (!lock)
508         {
509                 SpinRelease(masterLock);
510                 elog(FATAL, "LockAcquire: lock table %d is corrupted", lockmethod);
511                 return FALSE;
512         }
513
514         /* --------------------
515          * if it's a new lock object, initialize it
516          * --------------------
517          */
518         if (!found)
519         {
520                 lock->mask = 0;
521                 lock->nHolding = 0;
522                 lock->nActive = 0;
523                 MemSet((char *) lock->holders, 0, sizeof(int) * MAX_LOCKMODES);
524                 MemSet((char *) lock->activeHolders, 0, sizeof(int) * MAX_LOCKMODES);
525                 ProcQueueInit(&(lock->waitProcs));
526                 LOCK_PRINT("LockAcquire: new", lock, lockmode);
527         }
528         else
529         {
530                 LOCK_PRINT("LockAcquire: found", lock, lockmode);
531                 Assert((lock->nHolding > 0) && (lock->holders[lockmode] >= 0));
532                 Assert((lock->nActive > 0) && (lock->activeHolders[lockmode] >= 0));
533                 Assert(lock->nActive <= lock->nHolding);
534         }
535
536         /* ------------------
537          * Create the hash key for the holder table.
538          * ------------------
539          */
540         MemSet(&holdertag, 0, sizeof(HOLDERTAG)); /* must clear padding, needed */
541         holdertag.lock = MAKE_OFFSET(lock);
542         holdertag.pid = MyProcPid;
543         TransactionIdStore(xid, &holdertag.xid);
544
545         /*
546          * Find or create a holder entry with this tag
547          */
548         holderTable = lockMethodTable->holderHash;
549         holder = (HOLDER *) hash_search(holderTable, (Pointer) &holdertag,
550                                                                         HASH_ENTER, &found);
551         if (!holder)
552         {
553                 SpinRelease(masterLock);
554                 elog(NOTICE, "LockAcquire: holder table corrupted");
555                 return FALSE;
556         }
557
558         /*
559          * If new, initialize the new entry
560          */
561         if (!found)
562         {
563                 holder->nHolding = 0;
564                 MemSet((char *) holder->holders, 0, sizeof(int) * MAX_LOCKMODES);
565                 ProcAddLock(&holder->queue);
566                 HOLDER_PRINT("LockAcquire: new", holder);
567         }
568         else
569         {
570                 HOLDER_PRINT("LockAcquire: found", holder);
571                 Assert((holder->nHolding > 0) && (holder->holders[lockmode] >= 0));
572                 Assert(holder->nHolding <= lock->nActive);
573
574 #ifdef CHECK_DEADLOCK_RISK
575                 /*
576                  * Issue warning if we already hold a lower-level lock on this
577                  * object and do not hold a lock of the requested level or higher.
578                  * This indicates a deadlock-prone coding practice (eg, we'd have
579                  * a deadlock if another backend were following the same code path
580                  * at about the same time).
581                  *
582                  * This is not enabled by default, because it may generate log entries
583                  * about user-level coding practices that are in fact safe in context.
584                  * It can be enabled to help find system-level problems.
585                  *
586                  * XXX Doing numeric comparison on the lockmodes is a hack;
587                  * it'd be better to use a table.  For now, though, this works.
588                  */
589                 for (i = lockMethodTable->ctl->numLockModes; i > 0; i--)
590                 {
591                         if (holder->holders[i] > 0)
592                         {
593                                 if (i >= (int) lockmode)
594                                         break;          /* safe: we have a lock >= req level */
595                                 elog(DEBUG, "Deadlock risk: raising lock level"
596                                          " from %s to %s on object %u/%u/%u",
597                                          lock_types[i], lock_types[lockmode],
598                                          lock->tag.relId, lock->tag.dbId, lock->tag.objId.blkno);
599                                 break;
600                         }
601                 }
602 #endif /* CHECK_DEADLOCK_RISK */
603         }
604
605         /* ----------------
606          * lock->nHolding and lock->holders count the total number of holders
607          * either holding or waiting for the lock, so increment those immediately.
608          * The other counts don't increment till we get the lock.
609          * ----------------
610          */
611         lock->nHolding++;
612         lock->holders[lockmode]++;
613         Assert((lock->nHolding > 0) && (lock->holders[lockmode] > 0));
614
615         /* --------------------
616          * If I'm the only one holding any lock on this object, then there
617          * cannot be a conflict. The same is true if I already hold this lock.
618          * --------------------
619          */
620         if (holder->nHolding == lock->nActive || holder->holders[lockmode] != 0)
621         {
622                 GrantLock(lock, holder, lockmode);
623                 HOLDER_PRINT("LockAcquire: owning", holder);
624                 SpinRelease(masterLock);
625                 return TRUE;
626         }
627
628         /* --------------------
629          * If this process (under any XID) is a holder of the lock,
630          * then there is no conflict, either.
631          * --------------------
632          */
633         LockCountMyLocks(holder->tag.lock, MyProc, myHolders);
634         if (myHolders[lockmode] != 0)
635         {
636                 GrantLock(lock, holder, lockmode);
637                 HOLDER_PRINT("LockAcquire: my other XID owning", holder);
638                 SpinRelease(masterLock);
639                 return TRUE;
640         }
641
642         /*
643          * If lock requested conflicts with locks requested by waiters...
644          */
645         if (lockMethodTable->ctl->conflictTab[lockmode] & lock->waitMask)
646         {
647                 /*
648                  * If my process doesn't hold any locks that conflict with waiters
649                  * then force to sleep, so that prior waiters get first chance.
650                  */
651                 for (i = 1; i <= lockMethodTable->ctl->numLockModes; i++)
652                 {
653                         if (myHolders[i] > 0 &&
654                                 lockMethodTable->ctl->conflictTab[i] & lock->waitMask)
655                                 break;                  /* yes, there is a conflict */
656                 }
657
658                 if (i > lockMethodTable->ctl->numLockModes)
659                 {
660                         HOLDER_PRINT("LockAcquire: another proc already waiting",
661                                                  holder);
662                         status = STATUS_FOUND;
663                 }
664                 else
665                         status = LockResolveConflicts(lockmethod, lockmode,
666                                                                                   lock, holder,
667                                                                                   MyProc, myHolders);
668         }
669         else
670                 status = LockResolveConflicts(lockmethod, lockmode,
671                                                                           lock, holder,
672                                                                           MyProc, myHolders);
673
674         if (status == STATUS_OK)
675                 GrantLock(lock, holder, lockmode);
676         else if (status == STATUS_FOUND)
677         {
678 #ifdef USER_LOCKS
679
680                 /*
681                  * User locks are non blocking. If we can't acquire a lock we must
682                  * remove the holder entry and return FALSE without waiting.
683                  */
684                 if (lockmethod == USER_LOCKMETHOD)
685                 {
686                         if (holder->nHolding == 0)
687                         {
688                                 SHMQueueDelete(&holder->queue);
689                                 holder = (HOLDER *) hash_search(holderTable,
690                                                                                                 (Pointer) holder,
691                                                                                                 HASH_REMOVE, &found);
692                                 if (!holder || !found)
693                                         elog(NOTICE, "LockAcquire: remove holder, table corrupted");
694                         }
695                         else
696                                 HOLDER_PRINT("LockAcquire: NHOLDING", holder);
697                         lock->nHolding--;
698                         lock->holders[lockmode]--;
699                         LOCK_PRINT("LockAcquire: user lock failed", lock, lockmode);
700                         Assert((lock->nHolding > 0) && (lock->holders[lockmode] >= 0));
701                         Assert(lock->nActive <= lock->nHolding);
702                         SpinRelease(masterLock);
703                         return FALSE;
704                 }
705 #endif /* USER_LOCKS */
706
707                 /*
708                  * Construct bitmask of locks this process holds on this object.
709                  */
710                 {
711                         int                     holdLock = 0;
712                         int                     tmpMask;
713
714                         for (i = 1, tmpMask = 2;
715                                  i <= lockMethodTable->ctl->numLockModes;
716                                  i++, tmpMask <<= 1)
717                         {
718                                 if (myHolders[i] > 0)
719                                         holdLock |= tmpMask;
720                         }
721                         MyProc->holdLock = holdLock;
722                 }
723
724                 /*
725                  * Sleep till someone wakes me up.
726                  */
727                 status = WaitOnLock(lockmethod, lockmode, lock, holder);
728
729                 /*
730                  * Check the holder entry status, in case something in the ipc
731                  * communication doesn't work correctly.
732                  */
733                 if (!((holder->nHolding > 0) && (holder->holders[lockmode] > 0)))
734                 {
735                         HOLDER_PRINT("LockAcquire: INCONSISTENT", holder);
736                         LOCK_PRINT("LockAcquire: INCONSISTENT", lock, lockmode);
737                         /* Should we retry ? */
738                         SpinRelease(masterLock);
739                         return FALSE;
740                 }
741                 HOLDER_PRINT("LockAcquire: granted", holder);
742                 LOCK_PRINT("LockAcquire: granted", lock, lockmode);
743         }
744
745         SpinRelease(masterLock);
746
747         return status == STATUS_OK;
748 }
749
750 /* ----------------------------
751  * LockResolveConflicts -- test for lock conflicts
752  *
753  * NOTES:
754  *              Here's what makes this complicated: one transaction's
755  * locks don't conflict with one another.  When many processes
756  * hold locks, each has to subtract off the other's locks when
757  * determining whether or not any new lock acquired conflicts with
758  * the old ones.
759  *
760  * The caller can optionally pass the process's total holders counts, if
761  * known.  If NULL is passed then these values will be computed internally.
762  * ----------------------------
763  */
764 int
765 LockResolveConflicts(LOCKMETHOD lockmethod,
766                                          LOCKMODE lockmode,
767                                          LOCK *lock,
768                                          HOLDER *holder,
769                                          PROC *proc,
770                                          int *myHolders)                /* myHolders[] array or NULL */
771 {
772         LOCKMETHODCTL *lockctl = LockMethodTable[lockmethod]->ctl;
773         int                     numLockModes = lockctl->numLockModes;
774         int                     bitmask;
775         int                     i,
776                                 tmpMask;
777         int                     localHolders[MAX_LOCKMODES];
778
779         Assert((holder->nHolding >= 0) && (holder->holders[lockmode] >= 0));
780
781         /* ----------------------------
782          * first check for global conflicts: If no locks conflict
783          * with mine, then I get the lock.
784          *
785          * Checking for conflict: lock->mask represents the types of
786          * currently held locks.  conflictTable[lockmode] has a bit
787          * set for each type of lock that conflicts with mine.  Bitwise
788          * compare tells if there is a conflict.
789          * ----------------------------
790          */
791         if (!(lockctl->conflictTab[lockmode] & lock->mask))
792         {
793                 HOLDER_PRINT("LockResolveConflicts: no conflict", holder);
794                 return STATUS_OK;
795         }
796
797         /* ------------------------
798          * Rats.  Something conflicts. But it could still be my own
799          * lock.  We have to construct a conflict mask
800          * that does not reflect our own locks.  Locks held by the current
801          * process under another XID also count as "our own locks".
802          * ------------------------
803          */
804         if (myHolders == NULL)
805         {
806                 /* Caller didn't do calculation of total holding for me */
807                 LockCountMyLocks(holder->tag.lock, proc, localHolders);
808                 myHolders = localHolders;
809         }
810
811         /* Compute mask of lock types held by other processes */
812         bitmask = 0;
813         tmpMask = 2;
814         for (i = 1; i <= numLockModes; i++, tmpMask <<= 1)
815         {
816                 if (lock->activeHolders[i] != myHolders[i])
817                         bitmask |= tmpMask;
818         }
819
820         /* ------------------------
821          * now check again for conflicts.  'bitmask' describes the types
822          * of locks held by other processes.  If one of these
823          * conflicts with the kind of lock that I want, there is a
824          * conflict and I have to sleep.
825          * ------------------------
826          */
827         if (!(lockctl->conflictTab[lockmode] & bitmask))
828         {
829                 /* no conflict. OK to get the lock */
830                 HOLDER_PRINT("LockResolveConflicts: resolved", holder);
831                 return STATUS_OK;
832         }
833
834         HOLDER_PRINT("LockResolveConflicts: conflicting", holder);
835         return STATUS_FOUND;
836 }
837
838 /*
839  * LockCountMyLocks --- Count total number of locks held on a given lockable
840  *              object by a given process (under any transaction ID).
841  *
842  * XXX This could be rather slow if the process holds a large number of locks.
843  * Perhaps it could be sped up if we kept yet a third hashtable of per-
844  * process lock information.  However, for the normal case where a transaction
845  * doesn't hold a large number of locks, keeping such a table would probably
846  * be a net slowdown.
847  */
848 static void
849 LockCountMyLocks(SHMEM_OFFSET lockOffset, PROC *proc, int *myHolders)
850 {
851         HOLDER     *holder = NULL;
852         HOLDER     *nextHolder = NULL;
853         SHM_QUEUE  *lockQueue = &(proc->lockQueue);
854         SHMEM_OFFSET end = MAKE_OFFSET(lockQueue);
855         int                     i;
856
857         MemSet(myHolders, 0, MAX_LOCKMODES * sizeof(int));
858
859         if (SHMQueueEmpty(lockQueue))
860                 return;
861
862         SHMQueueFirst(lockQueue, (Pointer *) &holder, &holder->queue);
863
864         do
865         {
866                 /* ---------------------------
867                  * XXX Here we assume the shared memory queue is circular and
868                  * that we know its internal structure.  Should have some sort of
869                  * macros to allow one to walk it.      mer 20 July 1991
870                  * ---------------------------
871                  */
872                 if (holder->queue.next == end)
873                         nextHolder = NULL;
874                 else
875                         SHMQueueFirst(&holder->queue,
876                                                   (Pointer *) &nextHolder, &nextHolder->queue);
877
878                 if (lockOffset == holder->tag.lock)
879                 {
880                         for (i = 1; i < MAX_LOCKMODES; i++)
881                         {
882                                 myHolders[i] += holder->holders[i];
883                         }
884                 }
885
886                 holder = nextHolder;
887         } while (holder);
888 }
889
890 /*
891  * LockGetMyHoldLocks -- compute bitmask of lock types held by a process
892  *              for a given lockable object.
893  */
894 static int
895 LockGetMyHoldLocks(SHMEM_OFFSET lockOffset, PROC *proc)
896 {
897         int                     myHolders[MAX_LOCKMODES];
898         int                     holdLock = 0;
899         int                     i,
900                                 tmpMask;
901
902         LockCountMyLocks(lockOffset, proc, myHolders);
903
904         for (i = 1, tmpMask = 2;
905                  i < MAX_LOCKMODES;
906                  i++, tmpMask <<= 1)
907         {
908                 if (myHolders[i] > 0)
909                         holdLock |= tmpMask;
910         }
911         return holdLock;
912 }
913
914 /*
915  * GrantLock -- update the lock and holder data structures to show
916  *              the new lock has been granted.
917  */
918 void
919 GrantLock(LOCK *lock, HOLDER *holder, LOCKMODE lockmode)
920 {
921         lock->nActive++;
922         lock->activeHolders[lockmode]++;
923         lock->mask |= BITS_ON[lockmode];
924         LOCK_PRINT("GrantLock", lock, lockmode);
925         Assert((lock->nActive > 0) && (lock->activeHolders[lockmode] > 0));
926         Assert(lock->nActive <= lock->nHolding);
927         holder->holders[lockmode]++;
928         holder->nHolding++;
929         Assert((holder->nHolding > 0) && (holder->holders[lockmode] > 0));
930 }
931
932 /*
933  * WaitOnLock -- wait to acquire a lock
934  *
935  * The locktable spinlock must be held at entry.
936  */
937 static int
938 WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode,
939                    LOCK *lock, HOLDER *holder)
940 {
941         LOCKMETHODTABLE *lockMethodTable = LockMethodTable[lockmethod];
942         char       *new_status,
943                            *old_status;
944
945         Assert(lockmethod < NumLockMethods);
946
947         /*
948          * the waitqueue is ordered by priority. I insert myself according to
949          * the priority of the lock I am acquiring.
950          *
951          * SYNC NOTE: I am assuming that the lock table spinlock is sufficient
952          * synchronization for this queue.      That will not be true if/when
953          * people can be deleted from the queue by a SIGINT or something.
954          */
955         LOCK_PRINT("WaitOnLock: sleeping on lock", lock, lockmode);
956
957         old_status = pstrdup(get_ps_display());
958         new_status = (char *) palloc(strlen(old_status) + 10);
959         strcpy(new_status, old_status);
960         strcat(new_status, " waiting");
961         set_ps_display(new_status);
962
963         if (ProcSleep(lockMethodTable->ctl,
964                                   lockmode,
965                                   lock,
966                                   holder) != NO_ERROR)
967         {
968                 /* -------------------
969                  * We failed as a result of a deadlock, see HandleDeadLock().
970                  * Decrement the lock nHolding and holders fields as
971                  * we are no longer waiting on this lock.  Removal of the holder and
972                  * lock objects, if no longer needed, will happen in xact cleanup.
973                  * -------------------
974                  */
975                 lock->nHolding--;
976                 lock->holders[lockmode]--;
977                 LOCK_PRINT("WaitOnLock: aborting on lock", lock, lockmode);
978                 Assert((lock->nHolding >= 0) && (lock->holders[lockmode] >= 0));
979                 Assert(lock->nActive <= lock->nHolding);
980                 if (lock->activeHolders[lockmode] == lock->holders[lockmode])
981                         lock->waitMask &= BITS_OFF[lockmode];
982                 SpinRelease(lockMethodTable->ctl->masterLock);
983                 elog(ERROR, DeadLockMessage);
984                 /* not reached */
985         }
986
987         if (lock->activeHolders[lockmode] == lock->holders[lockmode])
988                 lock->waitMask &= BITS_OFF[lockmode];
989
990         set_ps_display(old_status);
991         pfree(old_status);
992         pfree(new_status);
993
994         LOCK_PRINT("WaitOnLock: wakeup on lock", lock, lockmode);
995         return STATUS_OK;
996 }
997
998 /*
999  * LockRelease -- look up 'locktag' in lock table 'lockmethod' and
1000  *              release it.
1001  *
1002  * Side Effects: if the lock no longer conflicts with the highest
1003  *              priority waiting process, that process is granted the lock
1004  *              and awoken. (We have to grant the lock here to avoid a
1005  *              race between the waking process and any new process to
1006  *              come along and request the lock.)
1007  */
1008 bool
1009 LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag,
1010                         TransactionId xid, LOCKMODE lockmode)
1011 {
1012         LOCK       *lock;
1013         SPINLOCK        masterLock;
1014         bool            found;
1015         LOCKMETHODTABLE *lockMethodTable;
1016         HOLDER     *holder;
1017         HOLDERTAG       holdertag;
1018         HTAB       *holderTable;
1019         bool            wakeupNeeded = true;
1020
1021 #ifdef LOCK_DEBUG
1022         if (lockmethod == USER_LOCKMETHOD && Trace_userlocks)
1023         elog(DEBUG, "LockRelease: user lock tag [%u] %d", locktag->objId.blkno, lockmode);
1024 #endif
1025
1026         /* ???????? This must be changed when short term locks will be used */
1027         locktag->lockmethod = lockmethod;
1028
1029         Assert(lockmethod < NumLockMethods);
1030         lockMethodTable = LockMethodTable[lockmethod];
1031         if (!lockMethodTable)
1032         {
1033                 elog(NOTICE, "lockMethodTable is null in LockRelease");
1034                 return FALSE;
1035         }
1036
1037         if (LockingIsDisabled)
1038                 return TRUE;
1039
1040         masterLock = lockMethodTable->ctl->masterLock;
1041         SpinAcquire(masterLock);
1042
1043         /*
1044          * Find a lock with this tag
1045          */
1046         Assert(lockMethodTable->lockHash->hash == tag_hash);
1047         lock = (LOCK *) hash_search(lockMethodTable->lockHash, (Pointer) locktag,
1048                                                                 HASH_FIND, &found);
1049
1050         /*
1051          * let the caller print its own error message, too. Do not
1052          * elog(ERROR).
1053          */
1054         if (!lock)
1055         {
1056                 SpinRelease(masterLock);
1057                 elog(NOTICE, "LockRelease: locktable corrupted");
1058                 return FALSE;
1059         }
1060
1061         if (!found)
1062         {
1063                 SpinRelease(masterLock);
1064                 elog(NOTICE, "LockRelease: no such lock");
1065                 return FALSE;
1066         }
1067         LOCK_PRINT("LockRelease: found", lock, lockmode);
1068         Assert((lock->nHolding > 0) && (lock->holders[lockmode] >= 0));
1069         Assert((lock->nActive > 0) && (lock->activeHolders[lockmode] >= 0));
1070         Assert(lock->nActive <= lock->nHolding);
1071
1072         /*
1073          * Find the holder entry for this holder.
1074          */
1075         MemSet(&holdertag, 0, sizeof(HOLDERTAG)); /* must clear padding, needed */
1076         holdertag.lock = MAKE_OFFSET(lock);
1077         holdertag.pid = MyProcPid;
1078         TransactionIdStore(xid, &holdertag.xid);
1079
1080         holderTable = lockMethodTable->holderHash;
1081         holder = (HOLDER *) hash_search(holderTable, (Pointer) &holdertag,
1082                                                                         HASH_FIND_SAVE, &found);
1083         if (!holder || !found)
1084         {
1085                 SpinRelease(masterLock);
1086 #ifdef USER_LOCKS
1087                 if (!found && lockmethod == USER_LOCKMETHOD)
1088             elog(NOTICE, "LockRelease: no lock with this tag");
1089                 else
1090 #endif
1091                         elog(NOTICE, "LockRelease: holder table corrupted");
1092                 return FALSE;
1093         }
1094         HOLDER_PRINT("LockRelease: found", holder);
1095         Assert(holder->tag.lock == MAKE_OFFSET(lock));
1096
1097         /*
1098          * Check that we are actually holding a lock of the type we want to
1099          * release.
1100          */
1101         if (!(holder->holders[lockmode] > 0))
1102         {
1103                 SpinRelease(masterLock);
1104                 HOLDER_PRINT("LockRelease: WRONGTYPE", holder);
1105                 elog(NOTICE, "LockRelease: you don't own a lock of type %s",
1106                          lock_types[lockmode]);
1107                 Assert(holder->holders[lockmode] >= 0);
1108                 return FALSE;
1109         }
1110         Assert(holder->nHolding > 0);
1111
1112         /*
1113          * fix the general lock stats
1114          */
1115         lock->nHolding--;
1116         lock->holders[lockmode]--;
1117         lock->nActive--;
1118         lock->activeHolders[lockmode]--;
1119
1120         if (!(lock->activeHolders[lockmode]))
1121         {
1122                 /* change the conflict mask.  No more of this lock type. */
1123                 lock->mask &= BITS_OFF[lockmode];
1124         }
1125
1126 #ifdef NOT_USED
1127         /* --------------------------
1128          * If there are still active locks of the type I just released, no one
1129          * should be woken up.  Whoever is asleep will still conflict
1130          * with the remaining locks.
1131          * --------------------------
1132          */
1133         if (lock->activeHolders[lockmode])
1134                 wakeupNeeded = false;
1135         else
1136 #endif
1137
1138                 /*
1139                  * Above is not valid any more (due to MVCC lock modes). Actually
1140                  * we should compare activeHolders[lockmode] with number of
1141                  * waiters holding lock of this type and try to wakeup only if
1142                  * these numbers are equal (and lock released conflicts with locks
1143                  * requested by waiters). For the moment we only check the last
1144                  * condition.
1145                  */
1146         if (lockMethodTable->ctl->conflictTab[lockmode] & lock->waitMask)
1147                 wakeupNeeded = true;
1148
1149         LOCK_PRINT("LockRelease: updated", lock, lockmode);
1150         Assert((lock->nHolding >= 0) && (lock->holders[lockmode] >= 0));
1151         Assert((lock->nActive >= 0) && (lock->activeHolders[lockmode] >= 0));
1152         Assert(lock->nActive <= lock->nHolding);
1153
1154         if (!lock->nHolding)
1155         {
1156                 /* ------------------
1157                  * if there's no one waiting in the queue,
1158                  * we just released the last lock on this object.
1159                  * Delete it from the lock table.
1160                  * ------------------
1161                  */
1162                 Assert(lockMethodTable->lockHash->hash == tag_hash);
1163                 lock = (LOCK *) hash_search(lockMethodTable->lockHash,
1164                                                                         (Pointer) &(lock->tag),
1165                                                                         HASH_REMOVE,
1166                                                                         &found);
1167                 Assert(lock && found);
1168                 wakeupNeeded = false;
1169         }
1170
1171         /*
1172          * Now fix the per-holder lock stats.
1173          */
1174         holder->holders[lockmode]--;
1175         holder->nHolding--;
1176         HOLDER_PRINT("LockRelease: updated", holder);
1177         Assert((holder->nHolding >= 0) && (holder->holders[lockmode] >= 0));
1178
1179         /*
1180          * If this was my last hold on this lock, delete my entry in the holder
1181          * table.
1182          */
1183         if (!holder->nHolding)
1184         {
1185                 if (holder->queue.prev == INVALID_OFFSET)
1186                         elog(NOTICE, "LockRelease: holder.prev == INVALID_OFFSET");
1187                 if (holder->queue.next == INVALID_OFFSET)
1188                         elog(NOTICE, "LockRelease: holder.next == INVALID_OFFSET");
1189                 if (holder->queue.next != INVALID_OFFSET)
1190                         SHMQueueDelete(&holder->queue);
1191                 HOLDER_PRINT("LockRelease: deleting", holder);
1192                 holder = (HOLDER *) hash_search(holderTable, (Pointer) &holder,
1193                                                                                 HASH_REMOVE_SAVED, &found);
1194                 if (!holder || !found)
1195                 {
1196                         SpinRelease(masterLock);
1197                         elog(NOTICE, "LockRelease: remove holder, table corrupted");
1198                         return FALSE;
1199                 }
1200         }
1201
1202         if (wakeupNeeded)
1203                 ProcLockWakeup(lockmethod, lock);
1204 #ifdef LOCK_DEBUG
1205         else if (LOCK_DEBUG_ENABLED(lock))
1206         elog(DEBUG, "LockRelease: no wakeup needed");
1207 #endif
1208
1209         SpinRelease(masterLock);
1210         return TRUE;
1211 }
1212
1213 /*
1214  * LockReleaseAll -- Release all locks in a process's lock queue.
1215  *
1216  * Well, not really *all* locks.
1217  *
1218  * If 'allxids' is TRUE, all locks of the specified lock method are
1219  * released, regardless of transaction affiliation.
1220  *
1221  * If 'allxids' is FALSE, all locks of the specified lock method and
1222  * specified XID are released.
1223  */
1224 bool
1225 LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc,
1226                            bool allxids, TransactionId xid)
1227 {
1228         HOLDER     *holder = NULL;
1229         HOLDER     *nextHolder = NULL;
1230         SHM_QUEUE  *lockQueue = &(proc->lockQueue);
1231         SHMEM_OFFSET end = MAKE_OFFSET(lockQueue);
1232         SPINLOCK        masterLock;
1233         LOCKMETHODTABLE *lockMethodTable;
1234         int                     i,
1235                                 numLockModes;
1236         LOCK       *lock;
1237         bool            found;
1238         int                     nleft;
1239
1240 #ifdef LOCK_DEBUG
1241         if (lockmethod == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
1242                 elog(DEBUG, "LockReleaseAll: lockmethod=%d, pid=%d",
1243                          lockmethod, MyProcPid);
1244 #endif
1245
1246         Assert(lockmethod < NumLockMethods);
1247         lockMethodTable = LockMethodTable[lockmethod];
1248         if (!lockMethodTable)
1249         {
1250                 elog(NOTICE, "LockReleaseAll: bad lockmethod %d", lockmethod);
1251                 return FALSE;
1252         }
1253
1254         if (SHMQueueEmpty(lockQueue))
1255                 return TRUE;
1256
1257         numLockModes = lockMethodTable->ctl->numLockModes;
1258         masterLock = lockMethodTable->ctl->masterLock;
1259
1260         SpinAcquire(masterLock);
1261
1262         SHMQueueFirst(lockQueue, (Pointer *) &holder, &holder->queue);
1263
1264         nleft = 0;
1265
1266         do
1267         {
1268                 bool            wakeupNeeded = false;
1269
1270                 /* ---------------------------
1271                  * XXX Here we assume the shared memory queue is circular and
1272                  * that we know its internal structure.  Should have some sort of
1273                  * macros to allow one to walk it.      mer 20 July 1991
1274                  * ---------------------------
1275                  */
1276                 if (holder->queue.next == end)
1277                         nextHolder = NULL;
1278                 else
1279                         SHMQueueFirst(&holder->queue,
1280                                                   (Pointer *) &nextHolder, &nextHolder->queue);
1281
1282                 Assert(holder->tag.pid == proc->pid);
1283
1284                 lock = (LOCK *) MAKE_PTR(holder->tag.lock);
1285
1286                 /* Ignore items that are not of the lockmethod to be removed */
1287                 if (LOCK_LOCKMETHOD(*lock) != lockmethod)
1288                 {
1289                         nleft++;
1290                         goto next_item;
1291                 }
1292
1293                 /* If not allxids, ignore items that are of the wrong xid */
1294                 if (!allxids && xid != holder->tag.xid)
1295                 {
1296                         nleft++;
1297                         goto next_item;
1298                 }
1299
1300                 HOLDER_PRINT("LockReleaseAll", holder);
1301                 LOCK_PRINT("LockReleaseAll", lock, 0);
1302                 Assert(lock->nHolding > 0);
1303                 Assert(lock->nActive > 0);
1304                 Assert(lock->nActive <= lock->nHolding);
1305                 Assert(holder->nHolding >= 0);
1306                 Assert(holder->nHolding <= lock->nHolding);
1307
1308                 /* ------------------
1309                  * fix the general lock stats
1310                  * ------------------
1311                  */
1312                 if (lock->nHolding != holder->nHolding)
1313                 {
1314                         for (i = 1; i <= numLockModes; i++)
1315                         {
1316                                 Assert(holder->holders[i] >= 0);
1317                                 lock->holders[i] -= holder->holders[i];
1318                                 lock->activeHolders[i] -= holder->holders[i];
1319                                 Assert((lock->holders[i] >= 0) \
1320                                            &&(lock->activeHolders[i] >= 0));
1321                                 if (!lock->activeHolders[i])
1322                                         lock->mask &= BITS_OFF[i];
1323
1324                                 /*
1325                                  * Read comments in LockRelease
1326                                  */
1327                                 if (!wakeupNeeded && holder->holders[i] > 0 &&
1328                                         lockMethodTable->ctl->conflictTab[i] & lock->waitMask)
1329                                         wakeupNeeded = true;
1330                         }
1331                         lock->nHolding -= holder->nHolding;
1332                         lock->nActive -= holder->nHolding;
1333                         Assert((lock->nHolding >= 0) && (lock->nActive >= 0));
1334                         Assert(lock->nActive <= lock->nHolding);
1335                 }
1336                 else
1337                 {
1338                         /* --------------
1339                          * set nHolding to zero so that we can garbage collect the lock
1340                          * down below...
1341                          * --------------
1342                          */
1343                         lock->nHolding = 0;
1344                         /* Fix the lock status, just for next LOCK_PRINT message. */
1345                         for (i = 1; i <= numLockModes; i++)
1346                         {
1347                                 Assert(lock->holders[i] == lock->activeHolders[i]);
1348                                 lock->holders[i] = lock->activeHolders[i] = 0;
1349                         }
1350                 }
1351                 LOCK_PRINT("LockReleaseAll: updated", lock, 0);
1352
1353                 HOLDER_PRINT("LockReleaseAll: deleting", holder);
1354
1355                 /*
1356                  * Remove the holder entry from the process' lock queue
1357                  */
1358                 SHMQueueDelete(&holder->queue);
1359
1360                 /*
1361                  * remove the holder entry from the hashtable
1362                  */
1363                 holder = (HOLDER *) hash_search(lockMethodTable->holderHash,
1364                                                                                 (Pointer) holder,
1365                                                                                 HASH_REMOVE,
1366                                                                                 &found);
1367                 if (!holder || !found)
1368                 {
1369                         SpinRelease(masterLock);
1370                         elog(NOTICE, "LockReleaseAll: holder table corrupted");
1371                         return FALSE;
1372                 }
1373
1374                 if (!lock->nHolding)
1375                 {
1376                         /* --------------------
1377                          * if there's no one waiting in the queue, we've just released
1378                          * the last lock.
1379                          * --------------------
1380                          */
1381                         LOCK_PRINT("LockReleaseAll: deleting", lock, 0);
1382                         Assert(lockMethodTable->lockHash->hash == tag_hash);
1383                         lock = (LOCK *) hash_search(lockMethodTable->lockHash,
1384                                                                                 (Pointer) &(lock->tag),
1385                                                                                 HASH_REMOVE, &found);
1386                         if ((!lock) || (!found))
1387                         {
1388                                 SpinRelease(masterLock);
1389                                 elog(NOTICE, "LockReleaseAll: cannot remove lock from HTAB");
1390                                 return FALSE;
1391                         }
1392                 }
1393                 else if (wakeupNeeded)
1394                         ProcLockWakeup(lockmethod, lock);
1395
1396 next_item:
1397                 holder = nextHolder;
1398         } while (holder);
1399
1400         /*
1401          * Reinitialize the queue only if nothing has been left in.
1402          */
1403         if (nleft == 0)
1404         {
1405 #ifdef LOCK_DEBUG
1406         if (lockmethod == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
1407             elog(DEBUG, "LockReleaseAll: reinitializing lockQueue");
1408 #endif
1409                 SHMQueueInit(lockQueue);
1410         }
1411
1412         SpinRelease(masterLock);
1413 #ifdef LOCK_DEBUG
1414     if (lockmethod == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
1415         elog(DEBUG, "LockReleaseAll: done");
1416 #endif
1417
1418         return TRUE;
1419 }
1420
1421 int
1422 LockShmemSize(int maxBackends)
1423 {
1424         int                     size = 0;
1425
1426         size += MAXALIGN(sizeof(PROC_HDR)); /* ProcGlobal */
1427         size += MAXALIGN(maxBackends * sizeof(PROC));           /* each MyProc */
1428         size += MAXALIGN(maxBackends * sizeof(LOCKMETHODCTL));          /* each
1429                                                                                                                                  * lockMethodTable->ctl */
1430
1431         /* lockHash table */
1432         size += hash_estimate_size(NLOCKENTS(maxBackends),
1433                                                            SHMEM_LOCKTAB_KEYSIZE,
1434                                                            SHMEM_LOCKTAB_DATASIZE);
1435
1436         /* holderHash table */
1437         size += hash_estimate_size(NLOCKENTS(maxBackends),
1438                                                            SHMEM_HOLDERTAB_KEYSIZE,
1439                                                            SHMEM_HOLDERTAB_DATASIZE);
1440
1441         /*
1442          * Since the lockHash entry count above is only an estimate, add 10%
1443          * safety margin.
1444          */
1445         size += size / 10;
1446
1447         return size;
1448 }
1449
1450 /*
1451  * DeadlockCheck -- Checks for deadlocks for a given process
1452  *
1453  * We can't block on user locks, so no sense testing for deadlock
1454  * because there is no blocking, and no timer for the block.
1455  *
1456  * This code takes a list of locks a process holds, and the lock that
1457  * the process is sleeping on, and tries to find if any of the processes
1458  * waiting on its locks hold the lock it is waiting for.  If no deadlock
1459  * is found, it goes on to look at all the processes waiting on their locks.
1460  *
1461  * We have already locked the master lock before being called.
1462  */
1463 bool
1464 DeadLockCheck(PROC *thisProc, LOCK *findlock)
1465 {
1466         HOLDER     *holder = NULL;
1467         HOLDER     *nextHolder = NULL;
1468         PROC       *waitProc;
1469         PROC_QUEUE *waitQueue;
1470         SHM_QUEUE  *lockQueue = &(thisProc->lockQueue);
1471         SHMEM_OFFSET end = MAKE_OFFSET(lockQueue);
1472         LOCKMETHODCTL *lockctl = LockMethodTable[DEFAULT_LOCKMETHOD]->ctl;
1473         LOCK       *lock;
1474         int                     i,
1475                                 j;
1476         bool            first_run = (thisProc == MyProc);
1477
1478         static PROC *checked_procs[MAXBACKENDS];
1479         static int      nprocs;
1480
1481         /* initialize at start of recursion */
1482         if (first_run)
1483         {
1484                 checked_procs[0] = thisProc;
1485                 nprocs = 1;
1486         }
1487
1488         if (SHMQueueEmpty(lockQueue))
1489                 return false;
1490
1491         SHMQueueFirst(lockQueue, (Pointer *) &holder, &holder->queue);
1492
1493         do
1494         {
1495                 /* ---------------------------
1496                  * XXX Here we assume the shared memory queue is circular and
1497                  * that we know its internal structure.  Should have some sort of
1498                  * macros to allow one to walk it.      mer 20 July 1991
1499                  * ---------------------------
1500                  */
1501                 if (holder->queue.next == end)
1502                         nextHolder = NULL;
1503                 else
1504                         SHMQueueFirst(&holder->queue,
1505                                                   (Pointer *) &nextHolder, &nextHolder->queue);
1506
1507                 Assert(holder->tag.pid == thisProc->pid);
1508
1509                 lock = (LOCK *) MAKE_PTR(holder->tag.lock);
1510
1511                 /* Ignore user locks */
1512                 if (lock->tag.lockmethod != DEFAULT_LOCKMETHOD)
1513                         goto nxtl;
1514
1515                 HOLDER_PRINT("DeadLockCheck", holder);
1516                 LOCK_PRINT("DeadLockCheck", lock, 0);
1517
1518                 /*
1519                  * waitLock is always in lockQueue of waiting proc, if !first_run
1520                  * then upper caller will handle waitProcs queue of waitLock.
1521                  */
1522                 if (thisProc->waitLock == lock && !first_run)
1523                         goto nxtl;
1524
1525                 /*
1526                  * If we found proc holding findlock and sleeping on some my other
1527                  * lock then we have to check does it block me or another waiters.
1528                  */
1529                 if (lock == findlock && !first_run)
1530                 {
1531                         int                     lm;
1532
1533                         Assert(holder->nHolding > 0);
1534                         for (lm = 1; lm <= lockctl->numLockModes; lm++)
1535                         {
1536                                 if (holder->holders[lm] > 0 &&
1537                                         lockctl->conflictTab[lm] & findlock->waitMask)
1538                                         return true;
1539                         }
1540
1541                         /*
1542                          * Else - get the next lock from thisProc's lockQueue
1543                          */
1544                         goto nxtl;
1545                 }
1546
1547                 waitQueue = &(lock->waitProcs);
1548                 waitProc = (PROC *) MAKE_PTR(waitQueue->links.prev);
1549
1550                 /*
1551                  * NOTE: loop must count down because we want to examine each item
1552                  * in the queue even if waitQueue->size decreases due to waking up
1553                  * some of the processes.
1554                  */
1555                 for (i = waitQueue->size; --i >= 0; )
1556                 {
1557                         Assert(waitProc->waitLock == lock);
1558                         if (waitProc == thisProc)
1559                         {
1560                                 /* This should only happen at first level */
1561                                 Assert(waitProc == MyProc);
1562                                 goto nextWaitProc;
1563                         }
1564                         if (lock == findlock)           /* first_run also true */
1565                         {
1566                                 /*
1567                                  * If me blocked by his holdlock...
1568                                  */
1569                                 if (lockctl->conflictTab[MyProc->waitLockMode] & waitProc->holdLock)
1570                                 {
1571                                         /* and he blocked by me -> deadlock */
1572                                         if (lockctl->conflictTab[waitProc->waitLockMode] & MyProc->holdLock)
1573                                                 return true;
1574                                         /* we shouldn't look at lockQueue of our blockers */
1575                                         goto nextWaitProc;
1576                                 }
1577
1578                                 /*
1579                                  * If he isn't blocked by me and we request
1580                                  * non-conflicting lock modes - no deadlock here because
1581                                  * he isn't blocked by me in any sense (explicitly or
1582                                  * implicitly). Note that we don't do like test if
1583                                  * !first_run (when thisProc is holder and non-waiter on
1584                                  * lock) and so we call DeadLockCheck below for every
1585                                  * waitProc in thisProc->lockQueue, even for waitProc-s
1586                                  * un-blocked by thisProc. Should we? This could save us
1587                                  * some time...
1588                                  */
1589                                 if (!(lockctl->conflictTab[waitProc->waitLockMode] & MyProc->holdLock) &&
1590                                         !(lockctl->conflictTab[waitProc->waitLockMode] & (1 << MyProc->waitLockMode)))
1591                                         goto nextWaitProc;
1592                         }
1593
1594                         /*
1595                          * Skip this waiter if already checked.
1596                          */
1597                         for (j = 0; j < nprocs; j++)
1598                         {
1599                                 if (checked_procs[j] == waitProc)
1600                                         goto nextWaitProc;
1601                         }
1602
1603                         /* Recursively check this process's lockQueue. */
1604                         Assert(nprocs < MAXBACKENDS);
1605                         checked_procs[nprocs++] = waitProc;
1606
1607                         if (DeadLockCheck(waitProc, findlock))
1608                         {
1609                                 int                     holdLock;
1610
1611                                 /*
1612                                  * Ok, but is waitProc waiting for me (thisProc) ?
1613                                  */
1614                                 if (thisProc->waitLock == lock)
1615                                 {
1616                                         Assert(first_run);
1617                                         holdLock = thisProc->holdLock;
1618                                 }
1619                                 else
1620                                 {
1621                                         /* should we cache holdLock to speed this up? */
1622                                         holdLock = LockGetMyHoldLocks(holder->tag.lock, thisProc);
1623                                         Assert(holdLock != 0);
1624                                 }
1625                                 if (lockctl->conflictTab[waitProc->waitLockMode] & holdLock)
1626                                 {
1627                                         /*
1628                                          * Last attempt to avoid deadlock: try to wakeup myself.
1629                                          */
1630                                         if (first_run)
1631                                         {
1632                                                 if (LockResolveConflicts(DEFAULT_LOCKMETHOD,
1633                                                                                                  MyProc->waitLockMode,
1634                                                                                                  MyProc->waitLock,
1635                                                                                                  MyProc->waitHolder,
1636                                                                                                  MyProc,
1637                                                                                                  NULL) == STATUS_OK)
1638                                                 {
1639                                                         SetWaitingForLock(false);
1640                                                         GrantLock(MyProc->waitLock,
1641                                                                           MyProc->waitHolder,
1642                                                                           MyProc->waitLockMode);
1643                                                         ProcWakeup(MyProc, NO_ERROR);
1644                                                         return false;
1645                                                 }
1646                                         }
1647                                         return true;
1648                                 }
1649
1650                                 /*
1651                                  * Hell! Is he blocked by any (other) holder ?
1652                                  */
1653                                 if (LockResolveConflicts(DEFAULT_LOCKMETHOD,
1654                                                                                  waitProc->waitLockMode,
1655                                                                                  lock,
1656                                                                                  waitProc->waitHolder,
1657                                                                                  waitProc,
1658                                                                                  NULL) != STATUS_OK)
1659                                 {
1660                                         /*
1661                                          * Blocked by others - no deadlock...
1662                                          */
1663                                         LOCK_PRINT("DeadLockCheck: blocked by others",
1664                                                            lock, waitProc->waitLockMode);
1665                                         goto nextWaitProc;
1666                                 }
1667
1668                                 /*
1669                                  * Well - wakeup this guy! This is the case of
1670                                  * implicit blocking: thisProc blocked someone who
1671                                  * blocked waitProc by the fact that he/someone is
1672                                  * already waiting for lock.  We do this for
1673                                  * anti-starving.
1674                                  */
1675                                 GrantLock(lock, waitProc->waitHolder, waitProc->waitLockMode);
1676                                 waitProc = ProcWakeup(waitProc, NO_ERROR);
1677                                 /*
1678                                  * Use next-proc link returned by ProcWakeup, since this
1679                                  * proc's own links field is now cleared.
1680                                  */
1681                                 continue;
1682                         }
1683
1684 nextWaitProc:
1685                         waitProc = (PROC *) MAKE_PTR(waitProc->links.prev);
1686                 }
1687
1688 nxtl:
1689                 holder = nextHolder;
1690         } while (holder);
1691
1692         /* if we got here, no deadlock */
1693         return false;
1694 }
1695
1696 #ifdef LOCK_DEBUG
1697 /*
1698  * Dump all locks in the proc->lockQueue. Must have already acquired
1699  * the masterLock.
1700  */
1701 void
1702 DumpLocks(void)
1703 {
1704         SHMEM_OFFSET location;
1705         PROC       *proc;
1706         SHM_QUEUE  *lockQueue;
1707         HOLDER     *holder = NULL;
1708         HOLDER     *nextHolder = NULL;
1709         SHMEM_OFFSET end;
1710         LOCK       *lock;
1711         int                     lockmethod = DEFAULT_LOCKMETHOD;
1712         LOCKMETHODTABLE *lockMethodTable;
1713
1714         ShmemPIDLookup(MyProcPid, &location);
1715         if (location == INVALID_OFFSET)
1716                 return;
1717         proc = (PROC *) MAKE_PTR(location);
1718         if (proc != MyProc)
1719                 return;
1720         lockQueue = &proc->lockQueue;
1721         end = MAKE_OFFSET(lockQueue);
1722
1723         Assert(lockmethod < NumLockMethods);
1724         lockMethodTable = LockMethodTable[lockmethod];
1725         if (!lockMethodTable)
1726                 return;
1727
1728         if (proc->waitLock)
1729                 LOCK_PRINT("DumpLocks: waiting on", proc->waitLock, 0);
1730
1731         if (SHMQueueEmpty(lockQueue))
1732                 return;
1733
1734         SHMQueueFirst(lockQueue, (Pointer *) &holder, &holder->queue);
1735
1736         do
1737         {
1738                 /* ---------------------------
1739                  * XXX Here we assume the shared memory queue is circular and
1740                  * that we know its internal structure.  Should have some sort of
1741                  * macros to allow one to walk it.      mer 20 July 1991
1742                  * ---------------------------
1743                  */
1744                 if (holder->queue.next == end)
1745                         nextHolder = NULL;
1746                 else
1747                         SHMQueueFirst(&holder->queue,
1748                                                   (Pointer *) &nextHolder, &nextHolder->queue);
1749
1750                 Assert(holder->tag.pid == proc->pid);
1751
1752                 lock = (LOCK *) MAKE_PTR(holder->tag.lock);
1753
1754                 HOLDER_PRINT("DumpLocks", holder);
1755                 LOCK_PRINT("DumpLocks", lock, 0);
1756
1757                 holder = nextHolder;
1758         } while (holder);
1759 }
1760
1761 /*
1762  * Dump all postgres locks. Must have already acquired the masterLock.
1763  */
1764 void
1765 DumpAllLocks(void)
1766 {
1767         SHMEM_OFFSET location;
1768         PROC       *proc;
1769         HOLDER     *holder = NULL;
1770         LOCK       *lock;
1771         int                     pid;
1772         int                     lockmethod = DEFAULT_LOCKMETHOD;
1773         LOCKMETHODTABLE *lockMethodTable;
1774         HTAB       *holderTable;
1775         HASH_SEQ_STATUS status;
1776
1777         pid = getpid();
1778         ShmemPIDLookup(pid, &location);
1779         if (location == INVALID_OFFSET)
1780                 return;
1781         proc = (PROC *) MAKE_PTR(location);
1782         if (proc != MyProc)
1783                 return;
1784
1785         Assert(lockmethod < NumLockMethods);
1786         lockMethodTable = LockMethodTable[lockmethod];
1787         if (!lockMethodTable)
1788                 return;
1789
1790         holderTable = lockMethodTable->holderHash;
1791
1792         if (proc->waitLock)
1793                 LOCK_PRINT("DumpAllLocks: waiting on", proc->waitLock, 0);
1794
1795         hash_seq_init(&status, holderTable);
1796         while ((holder = (HOLDER *) hash_seq_search(&status)) &&
1797                    (holder != (HOLDER *) TRUE))
1798         {
1799                 HOLDER_PRINT("DumpAllLocks", holder);
1800
1801                 if (holder->tag.lock)
1802                 {
1803                         lock = (LOCK *) MAKE_PTR(holder->tag.lock);
1804                         LOCK_PRINT("DumpAllLocks", lock, 0);
1805                 }
1806                 else
1807                         elog(DEBUG, "DumpAllLocks: holder->tag.lock = NULL");
1808         }
1809 }
1810
1811 #endif /* LOCK_DEBUG */