1 /*-------------------------------------------------------------------------
4 * Lightweight lock manager
6 * Lightweight locks are intended primarily to provide mutual exclusion of
7 * access to shared-memory data structures. Therefore, they offer both
8 * exclusive and shared lock modes (to support read/write and read-only
9 * access to a shared object). There are few other frammishes. User-level
10 * locking should be done with the full lock manager --- which depends on
11 * an LWLock to protect its shared state.
14 * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
15 * Portions Copyright (c) 1994, Regents of the University of California
18 * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lwlock.c,v 1.15 2003/06/11 22:37:45 momjian Exp $
20 *-------------------------------------------------------------------------
24 #include "access/clog.h"
25 #include "storage/lwlock.h"
26 #include "storage/proc.h"
27 #include "storage/spin.h"
32 slock_t mutex; /* Protects LWLock and queue of PGPROCs */
33 bool releaseOK; /* T if ok to release waiters */
34 char exclusive; /* # of exclusive holders (0 or 1) */
35 int shared; /* # of shared holders (0..MaxBackends) */
36 PGPROC *head; /* head of list of waiting PGPROCs */
37 PGPROC *tail; /* tail of list of waiting PGPROCs */
38 /* tail is undefined when head is NULL */
42 * This points to the array of LWLocks in shared memory. Backends inherit
43 * the pointer by fork from the postmaster. LWLockIds are indexes into
46 static LWLock *LWLockArray = NULL;
48 /* shared counter for dynamic allocation of LWLockIds */
49 static int *LWLockCounter;
53 * We use this structure to keep track of locked LWLocks for release
54 * during error recovery. The maximum size could be determined at runtime
55 * if necessary, but it seems unlikely that more than a few locks could
56 * ever be held simultaneously.
58 #define MAX_SIMUL_LWLOCKS 100
60 static int num_held_lwlocks = 0;
61 static LWLockId held_lwlocks[MAX_SIMUL_LWLOCKS];
65 bool Trace_lwlocks = false;
68 PRINT_LWDEBUG(const char *where, LWLockId lockid, const volatile LWLock *lock)
71 elog(LOG, "%s(%d): excl %d shared %d head %p rOK %d",
73 (int) lock->exclusive, lock->shared, lock->head,
74 (int) lock->releaseOK);
78 LOG_LWDEBUG(const char *where, LWLockId lockid, const char *msg)
81 elog(LOG, "%s(%d): %s", where, (int) lockid, msg);
84 #else /* not LOCK_DEBUG */
85 #define PRINT_LWDEBUG(a,b,c)
86 #define LOG_LWDEBUG(a,b,c)
87 #endif /* LOCK_DEBUG */
91 * Compute number of LWLocks to allocate.
99 * Possibly this logic should be spread out among the affected
100 * modules, the same way that shmem space estimation is done. But for
101 * now, there are few enough users of LWLocks that we can get away
102 * with just keeping the knowledge here.
105 /* Predefined LWLocks */
106 numLocks = (int) NumFixedLWLocks;
108 /* bufmgr.c needs two for each shared buffer */
109 numLocks += 2 * NBuffers;
111 /* clog.c needs one per CLOG buffer + one control lock */
112 numLocks += NUM_CLOG_BUFFERS + 1;
114 /* Perhaps create a few more for use by user-defined modules? */
121 * Compute shmem space needed for LWLocks.
124 LWLockShmemSize(void)
126 int numLocks = NumLWLocks();
129 /* Allocate the LWLocks plus space for shared allocation counter. */
130 spaceLocks = numLocks * sizeof(LWLock) + 2 * sizeof(int);
131 spaceLocks = MAXALIGN(spaceLocks);
133 return (int) spaceLocks;
138 * Allocate shmem space for LWLocks and initialize the locks.
143 int numLocks = NumLWLocks();
144 uint32 spaceLocks = LWLockShmemSize();
149 LWLockArray = (LWLock *) ShmemAlloc(spaceLocks);
152 * Initialize all LWLocks to "unlocked" state
154 for (id = 0, lock = LWLockArray; id < numLocks; id++, lock++)
156 SpinLockInit(&lock->mutex);
157 lock->releaseOK = true;
165 * Initialize the dynamic-allocation counter at the end of the array
167 LWLockCounter = (int *) lock;
168 LWLockCounter[0] = (int) NumFixedLWLocks;
169 LWLockCounter[1] = numLocks;
174 * LWLockAssign - assign a dynamically-allocated LWLock number
176 * NB: we do not currently try to interlock this. Could perhaps use
177 * ShmemLock spinlock if there were any need to assign LWLockIds after
183 if (LWLockCounter[0] >= LWLockCounter[1])
184 elog(FATAL, "No more LWLockIds available");
185 return (LWLockId) (LWLockCounter[0]++);
190 * LWLockAcquire - acquire a lightweight lock in the specified mode
192 * If the lock is not available, sleep until it is.
194 * Side effect: cancel/die interrupts are held off until lock release.
197 LWLockAcquire(LWLockId lockid, LWLockMode mode)
199 volatile LWLock *lock = LWLockArray + lockid;
200 PGPROC *proc = MyProc;
204 PRINT_LWDEBUG("LWLockAcquire", lockid, lock);
207 * We can't wait if we haven't got a PGPROC. This should only occur
208 * during bootstrap or shared memory initialization. Put an Assert
209 * here to catch unsafe coding practices.
211 Assert(!(proc == NULL && IsUnderPostmaster));
214 * Lock out cancel/die interrupts until we exit the code section
215 * protected by the LWLock. This ensures that interrupts will not
216 * interfere with manipulations of data structures in shared memory.
221 * Loop here to try to acquire lock after each time we are signaled by
224 * NOTE: it might seem better to have LWLockRelease actually grant us the
225 * lock, rather than retrying and possibly having to go back to sleep.
226 * But in practice that is no good because it means a process swap for
227 * every lock acquisition when two or more processes are contending
228 * for the same lock. Since LWLocks are normally used to protect
229 * not-very-long sections of computation, a process needs to be able
230 * to acquire and release the same lock many times during a single CPU
231 * time slice, even in the presence of contention. The efficiency of
232 * being able to do that outweighs the inefficiency of sometimes
233 * wasting a process dispatch cycle because the lock is not free when
234 * a released waiter finally gets to run. See pgsql-hackers archives
241 /* Acquire mutex. Time spent holding mutex should be short! */
242 SpinLockAcquire_NoHoldoff(&lock->mutex);
244 /* If retrying, allow LWLockRelease to release waiters again */
246 lock->releaseOK = true;
248 /* If I can get the lock, do so quickly. */
249 if (mode == LW_EXCLUSIVE)
251 if (lock->exclusive == 0 && lock->shared == 0)
261 if (lock->exclusive == 0)
271 break; /* got the lock */
274 * Add myself to wait queue.
276 * If we don't have a PGPROC structure, there's no way to wait. This
277 * should never occur, since MyProc should only be null during
278 * shared memory initialization.
281 elog(FATAL, "LWLockAcquire: can't wait without a PGPROC structure");
283 proc->lwWaiting = true;
284 proc->lwExclusive = (mode == LW_EXCLUSIVE);
285 proc->lwWaitLink = NULL;
286 if (lock->head == NULL)
289 lock->tail->lwWaitLink = proc;
292 /* Can release the mutex now */
293 SpinLockRelease_NoHoldoff(&lock->mutex);
296 * Wait until awakened.
298 * Since we share the process wait semaphore with the regular lock
299 * manager and ProcWaitForSignal, and we may need to acquire an
300 * LWLock while one of those is pending, it is possible that we
301 * get awakened for a reason other than being signaled by
302 * LWLockRelease. If so, loop back and wait again. Once we've
303 * gotten the LWLock, re-increment the sema by the number of
304 * additional signals received, so that the lock manager or signal
305 * manager will see the received signal when it next waits.
307 LOG_LWDEBUG("LWLockAcquire", lockid, "waiting");
311 /* "false" means cannot accept cancel/die interrupt here. */
312 PGSemaphoreLock(&proc->sem, false);
313 if (!proc->lwWaiting)
318 LOG_LWDEBUG("LWLockAcquire", lockid, "awakened");
320 /* Now loop back and try to acquire lock again. */
324 /* We are done updating shared state of the lock itself. */
325 SpinLockRelease_NoHoldoff(&lock->mutex);
327 /* Add lock to list of locks held by this backend */
328 Assert(num_held_lwlocks < MAX_SIMUL_LWLOCKS);
329 held_lwlocks[num_held_lwlocks++] = lockid;
332 * Fix the process wait semaphore's count for any absorbed wakeups.
334 while (extraWaits-- > 0)
335 PGSemaphoreUnlock(&proc->sem);
339 * LWLockConditionalAcquire - acquire a lightweight lock in the specified mode
341 * If the lock is not available, return FALSE with no side-effects.
343 * If successful, cancel/die interrupts are held off until lock release.
346 LWLockConditionalAcquire(LWLockId lockid, LWLockMode mode)
348 volatile LWLock *lock = LWLockArray + lockid;
351 PRINT_LWDEBUG("LWLockConditionalAcquire", lockid, lock);
354 * Lock out cancel/die interrupts until we exit the code section
355 * protected by the LWLock. This ensures that interrupts will not
356 * interfere with manipulations of data structures in shared memory.
360 /* Acquire mutex. Time spent holding mutex should be short! */
361 SpinLockAcquire_NoHoldoff(&lock->mutex);
363 /* If I can get the lock, do so quickly. */
364 if (mode == LW_EXCLUSIVE)
366 if (lock->exclusive == 0 && lock->shared == 0)
376 if (lock->exclusive == 0)
385 /* We are done updating shared state of the lock itself. */
386 SpinLockRelease_NoHoldoff(&lock->mutex);
390 /* Failed to get lock, so release interrupt holdoff */
392 LOG_LWDEBUG("LWLockConditionalAcquire", lockid, "failed");
396 /* Add lock to list of locks held by this backend */
397 Assert(num_held_lwlocks < MAX_SIMUL_LWLOCKS);
398 held_lwlocks[num_held_lwlocks++] = lockid;
405 * LWLockRelease - release a previously acquired lock
408 LWLockRelease(LWLockId lockid)
410 volatile LWLock *lock = LWLockArray + lockid;
415 PRINT_LWDEBUG("LWLockRelease", lockid, lock);
418 * Remove lock from list of locks held. Usually, but not always, it
419 * will be the latest-acquired lock; so search array backwards.
421 for (i = num_held_lwlocks; --i >= 0;)
423 if (lockid == held_lwlocks[i])
427 elog(ERROR, "LWLockRelease: lock %d is not held", (int) lockid);
429 for (; i < num_held_lwlocks; i++)
430 held_lwlocks[i] = held_lwlocks[i + 1];
432 /* Acquire mutex. Time spent holding mutex should be short! */
433 SpinLockAcquire_NoHoldoff(&lock->mutex);
435 /* Release my hold on lock */
436 if (lock->exclusive > 0)
440 Assert(lock->shared > 0);
445 * See if I need to awaken any waiters. If I released a non-last
446 * shared hold, there cannot be anything to do. Also, do not awaken
447 * any waiters if someone has already awakened waiters that haven't
448 * yet acquired the lock.
453 if (lock->exclusive == 0 && lock->shared == 0 && lock->releaseOK)
456 * Remove the to-be-awakened PGPROCs from the queue. If the
457 * front waiter wants exclusive lock, awaken him only.
458 * Otherwise awaken as many waiters as want shared access.
461 if (!proc->lwExclusive)
463 while (proc->lwWaitLink != NULL &&
464 !proc->lwWaitLink->lwExclusive)
465 proc = proc->lwWaitLink;
467 /* proc is now the last PGPROC to be released */
468 lock->head = proc->lwWaitLink;
469 proc->lwWaitLink = NULL;
470 /* prevent additional wakeups until retryer gets to run */
471 lock->releaseOK = false;
475 /* lock is still held, can't awaken anything */
480 /* We are done updating shared state of the lock itself. */
481 SpinLockRelease_NoHoldoff(&lock->mutex);
484 * Awaken any waiters I removed from the queue.
488 LOG_LWDEBUG("LWLockRelease", lockid, "release waiter");
490 head = proc->lwWaitLink;
491 proc->lwWaitLink = NULL;
492 proc->lwWaiting = false;
493 PGSemaphoreUnlock(&proc->sem);
497 * Now okay to allow cancel/die interrupts.
504 * LWLockReleaseAll - release all currently-held locks
506 * Used to clean up after elog(ERROR). An important difference between this
507 * function and retail LWLockRelease calls is that InterruptHoldoffCount is
508 * unchanged by this operation. This is necessary since InterruptHoldoffCount
509 * has been set to an appropriate level earlier in error recovery. We could
510 * decrement it below zero if we allow it to drop for each released lock!
513 LWLockReleaseAll(void)
515 while (num_held_lwlocks > 0)
517 HOLD_INTERRUPTS(); /* match the upcoming RESUME_INTERRUPTS */
519 LWLockRelease(held_lwlocks[num_held_lwlocks - 1]);