1 /*-------------------------------------------------------------------------
4 * Lightweight lock manager
6 * Lightweight locks are intended primarily to provide mutual exclusion of
7 * access to shared-memory data structures. Therefore, they offer both
8 * exclusive and shared lock modes (to support read/write and read-only
9 * access to a shared object). There are few other frammishes. User-level
10 * locking should be done with the full lock manager --- which depends on
11 * an LWLock to protect its shared state.
14 * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
15 * Portions Copyright (c) 1994, Regents of the University of California
18 * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lwlock.c,v 1.10 2002/05/05 00:03:28 tgl Exp $
20 *-------------------------------------------------------------------------
24 #include "access/clog.h"
25 #include "storage/lwlock.h"
26 #include "storage/proc.h"
27 #include "storage/spin.h"
32 slock_t mutex; /* Protects LWLock and queue of PROCs */
33 bool releaseOK; /* T if ok to release waiters */
34 char exclusive; /* # of exclusive holders (0 or 1) */
35 int shared; /* # of shared holders (0..MaxBackends) */
36 PROC *head; /* head of list of waiting PROCs */
37 PROC *tail; /* tail of list of waiting PROCs */
38 /* tail is undefined when head is NULL */
42 * This points to the array of LWLocks in shared memory. Backends inherit
43 * the pointer by fork from the postmaster. LWLockIds are indexes into
46 static LWLock *LWLockArray = NULL;
48 /* shared counter for dynamic allocation of LWLockIds */
49 static int *LWLockCounter;
53 * We use this structure to keep track of locked LWLocks for release
54 * during error recovery. The maximum size could be determined at runtime
55 * if necessary, but it seems unlikely that more than a few locks could
56 * ever be held simultaneously.
58 #define MAX_SIMUL_LWLOCKS 100
60 static int num_held_lwlocks = 0;
61 static LWLockId held_lwlocks[MAX_SIMUL_LWLOCKS];
65 bool Trace_lwlocks = false;
68 PRINT_LWDEBUG(const char *where, LWLockId lockid, const volatile LWLock *lock)
71 elog(LOG, "%s(%d): excl %d shared %d head %p rOK %d",
73 (int) lock->exclusive, lock->shared, lock->head,
74 (int) lock->releaseOK);
78 LOG_LWDEBUG(const char *where, LWLockId lockid, const char *msg)
81 elog(LOG, "%s(%d): %s", where, (int) lockid, msg);
84 #else /* not LOCK_DEBUG */
85 #define PRINT_LWDEBUG(a,b,c)
86 #define LOG_LWDEBUG(a,b,c)
87 #endif /* LOCK_DEBUG */
91 * Compute number of LWLocks to allocate.
99 * Possibly this logic should be spread out among the affected
100 * modules, the same way that shmem space estimation is done. But for
101 * now, there are few enough users of LWLocks that we can get away
102 * with just keeping the knowledge here.
105 /* Predefined LWLocks */
106 numLocks = (int) NumFixedLWLocks;
108 /* bufmgr.c needs two for each shared buffer */
109 numLocks += 2 * NBuffers;
111 /* clog.c needs one per CLOG buffer */
112 numLocks += NUM_CLOG_BUFFERS;
114 /* Perhaps create a few more for use by user-defined modules? */
121 * Compute shmem space needed for LWLocks.
124 LWLockShmemSize(void)
126 int numLocks = NumLWLocks();
129 /* Allocate the LWLocks plus space for shared allocation counter. */
130 spaceLocks = numLocks * sizeof(LWLock) + 2 * sizeof(int);
131 spaceLocks = MAXALIGN(spaceLocks);
133 return (int) spaceLocks;
138 * Allocate shmem space for LWLocks and initialize the locks.
143 int numLocks = NumLWLocks();
144 uint32 spaceLocks = LWLockShmemSize();
149 LWLockArray = (LWLock *) ShmemAlloc(spaceLocks);
152 * Initialize all LWLocks to "unlocked" state
154 for (id = 0, lock = LWLockArray; id < numLocks; id++, lock++)
156 SpinLockInit(&lock->mutex);
157 lock->releaseOK = true;
165 * Initialize the dynamic-allocation counter at the end of the array
167 LWLockCounter = (int *) lock;
168 LWLockCounter[0] = (int) NumFixedLWLocks;
169 LWLockCounter[1] = numLocks;
174 * LWLockAssign - assign a dynamically-allocated LWLock number
176 * NB: we do not currently try to interlock this. Could perhaps use
177 * ShmemLock spinlock if there were any need to assign LWLockIds after
183 if (LWLockCounter[0] >= LWLockCounter[1])
184 elog(FATAL, "No more LWLockIds available");
185 return (LWLockId) (LWLockCounter[0]++);
190 * LWLockAcquire - acquire a lightweight lock in the specified mode
192 * If the lock is not available, sleep until it is.
194 * Side effect: cancel/die interrupts are held off until lock release.
197 LWLockAcquire(LWLockId lockid, LWLockMode mode)
199 volatile LWLock *lock = LWLockArray + lockid;
204 PRINT_LWDEBUG("LWLockAcquire", lockid, lock);
207 * Lock out cancel/die interrupts until we exit the code section
208 * protected by the LWLock. This ensures that interrupts will not
209 * interfere with manipulations of data structures in shared memory.
214 * Loop here to try to acquire lock after each time we are signaled
217 * NOTE: it might seem better to have LWLockRelease actually grant us
218 * the lock, rather than retrying and possibly having to go back to
219 * sleep. But in practice that is no good because it means a process
220 * swap for every lock acquisition when two or more processes are
221 * contending for the same lock. Since LWLocks are normally used to
222 * protect not-very-long sections of computation, a process needs to
223 * be able to acquire and release the same lock many times during a
224 * single CPU time slice, even in the presence of contention. The
225 * efficiency of being able to do that outweighs the inefficiency of
226 * sometimes wasting a process dispatch cycle because the lock is not
227 * free when a released waiter finally gets to run. See pgsql-hackers
228 * archives for 29-Dec-01.
234 /* Acquire mutex. Time spent holding mutex should be short! */
235 SpinLockAcquire_NoHoldoff(&lock->mutex);
237 /* If retrying, allow LWLockRelease to release waiters again */
239 lock->releaseOK = true;
241 /* If I can get the lock, do so quickly. */
242 if (mode == LW_EXCLUSIVE)
244 if (lock->exclusive == 0 && lock->shared == 0)
254 if (lock->exclusive == 0)
264 break; /* got the lock */
267 * Add myself to wait queue.
269 * If we don't have a PROC structure, there's no way to wait. This
270 * should never occur, since MyProc should only be null during
271 * shared memory initialization.
274 elog(FATAL, "LWLockAcquire: can't wait without a PROC structure");
276 proc->lwWaiting = true;
277 proc->lwExclusive = (mode == LW_EXCLUSIVE);
278 proc->lwWaitLink = NULL;
279 if (lock->head == NULL)
282 lock->tail->lwWaitLink = proc;
285 /* Can release the mutex now */
286 SpinLockRelease_NoHoldoff(&lock->mutex);
289 * Wait until awakened.
291 * Since we share the process wait semaphore with the regular lock
292 * manager and ProcWaitForSignal, and we may need to acquire an
293 * LWLock while one of those is pending, it is possible that we get
294 * awakened for a reason other than being signaled by LWLockRelease.
295 * If so, loop back and wait again. Once we've gotten the LWLock,
296 * re-increment the sema by the number of additional signals
297 * received, so that the lock manager or signal manager will see
298 * the received signal when it next waits.
300 LOG_LWDEBUG("LWLockAcquire", lockid, "waiting");
304 /* "false" means cannot accept cancel/die interrupt here. */
305 PGSemaphoreLock(&proc->sem, false);
306 if (!proc->lwWaiting)
311 LOG_LWDEBUG("LWLockAcquire", lockid, "awakened");
313 /* Now loop back and try to acquire lock again. */
317 /* We are done updating shared state of the lock itself. */
318 SpinLockRelease_NoHoldoff(&lock->mutex);
320 /* Add lock to list of locks held by this backend */
321 Assert(num_held_lwlocks < MAX_SIMUL_LWLOCKS);
322 held_lwlocks[num_held_lwlocks++] = lockid;
325 * Fix the process wait semaphore's count for any absorbed wakeups.
327 while (extraWaits-- > 0)
328 PGSemaphoreUnlock(&proc->sem);
332 * LWLockConditionalAcquire - acquire a lightweight lock in the specified mode
334 * If the lock is not available, return FALSE with no side-effects.
336 * If successful, cancel/die interrupts are held off until lock release.
339 LWLockConditionalAcquire(LWLockId lockid, LWLockMode mode)
341 volatile LWLock *lock = LWLockArray + lockid;
344 PRINT_LWDEBUG("LWLockConditionalAcquire", lockid, lock);
347 * Lock out cancel/die interrupts until we exit the code section
348 * protected by the LWLock. This ensures that interrupts will not
349 * interfere with manipulations of data structures in shared memory.
353 /* Acquire mutex. Time spent holding mutex should be short! */
354 SpinLockAcquire_NoHoldoff(&lock->mutex);
356 /* If I can get the lock, do so quickly. */
357 if (mode == LW_EXCLUSIVE)
359 if (lock->exclusive == 0 && lock->shared == 0)
369 if (lock->exclusive == 0)
378 /* We are done updating shared state of the lock itself. */
379 SpinLockRelease_NoHoldoff(&lock->mutex);
383 /* Failed to get lock, so release interrupt holdoff */
385 LOG_LWDEBUG("LWLockConditionalAcquire", lockid, "failed");
389 /* Add lock to list of locks held by this backend */
390 Assert(num_held_lwlocks < MAX_SIMUL_LWLOCKS);
391 held_lwlocks[num_held_lwlocks++] = lockid;
398 * LWLockRelease - release a previously acquired lock
401 LWLockRelease(LWLockId lockid)
403 volatile LWLock *lock = LWLockArray + lockid;
408 PRINT_LWDEBUG("LWLockRelease", lockid, lock);
411 * Remove lock from list of locks held. Usually, but not always, it
412 * will be the latest-acquired lock; so search array backwards.
414 for (i = num_held_lwlocks; --i >= 0;)
416 if (lockid == held_lwlocks[i])
420 elog(ERROR, "LWLockRelease: lock %d is not held", (int) lockid);
422 for (; i < num_held_lwlocks; i++)
423 held_lwlocks[i] = held_lwlocks[i + 1];
425 /* Acquire mutex. Time spent holding mutex should be short! */
426 SpinLockAcquire_NoHoldoff(&lock->mutex);
428 /* Release my hold on lock */
429 if (lock->exclusive > 0)
433 Assert(lock->shared > 0);
438 * See if I need to awaken any waiters. If I released a non-last
439 * shared hold, there cannot be anything to do. Also, do not awaken
440 * any waiters if someone has already awakened waiters that haven't
441 * yet acquired the lock.
446 if (lock->exclusive == 0 && lock->shared == 0 && lock->releaseOK)
449 * Remove the to-be-awakened PROCs from the queue. If the
450 * front waiter wants exclusive lock, awaken him only.
451 * Otherwise awaken as many waiters as want shared access.
454 if (!proc->lwExclusive)
456 while (proc->lwWaitLink != NULL &&
457 !proc->lwWaitLink->lwExclusive)
459 proc = proc->lwWaitLink;
462 /* proc is now the last PROC to be released */
463 lock->head = proc->lwWaitLink;
464 proc->lwWaitLink = NULL;
465 /* prevent additional wakeups until retryer gets to run */
466 lock->releaseOK = false;
470 /* lock is still held, can't awaken anything */
475 /* We are done updating shared state of the lock itself. */
476 SpinLockRelease_NoHoldoff(&lock->mutex);
479 * Awaken any waiters I removed from the queue.
483 LOG_LWDEBUG("LWLockRelease", lockid, "release waiter");
485 head = proc->lwWaitLink;
486 proc->lwWaitLink = NULL;
487 proc->lwWaiting = false;
488 PGSemaphoreUnlock(&proc->sem);
492 * Now okay to allow cancel/die interrupts.
499 * LWLockReleaseAll - release all currently-held locks
501 * Used to clean up after elog(ERROR). An important difference between this
502 * function and retail LWLockRelease calls is that InterruptHoldoffCount is
503 * unchanged by this operation. This is necessary since InterruptHoldoffCount
504 * has been set to an appropriate level earlier in error recovery. We could
505 * decrement it below zero if we allow it to drop for each released lock!
508 LWLockReleaseAll(void)
510 while (num_held_lwlocks > 0)
512 HOLD_INTERRUPTS(); /* match the upcoming RESUME_INTERRUPTS */
514 LWLockRelease(held_lwlocks[num_held_lwlocks - 1]);