1 /*-------------------------------------------------------------------------
4 * Lightweight lock manager
6 * Lightweight locks are intended primarily to provide mutual exclusion of
7 * access to shared-memory data structures. Therefore, they offer both
8 * exclusive and shared lock modes (to support read/write and read-only
9 * access to a shared object). There are few other frammishes. User-level
10 * locking should be done with the full lock manager --- which depends on
11 * an LWLock to protect its shared state.
14 * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
15 * Portions Copyright (c) 1994, Regents of the University of California
18 * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lwlock.c,v 1.3 2001/11/05 17:46:28 momjian Exp $
20 *-------------------------------------------------------------------------
24 #include "access/clog.h"
25 #include "storage/lwlock.h"
26 #include "storage/proc.h"
27 #include "storage/spin.h"
32 slock_t mutex; /* Protects LWLock and queue of PROCs */
33 char exclusive; /* # of exclusive holders (0 or 1) */
34 int shared; /* # of shared holders (0..MaxBackends) */
35 PROC *head; /* head of list of waiting PROCs */
36 PROC *tail; /* tail of list of waiting PROCs */
37 /* tail is undefined when head is NULL */
41 * This points to the array of LWLocks in shared memory. Backends inherit
42 * the pointer by fork from the postmaster. LWLockIds are indexes into
45 static LWLock *LWLockArray = NULL;
47 /* shared counter for dynamic allocation of LWLockIds */
48 static int *LWLockCounter;
52 * We use this structure to keep track of locked LWLocks for release
53 * during error recovery. The maximum size could be determined at runtime
54 * if necessary, but it seems unlikely that more than a few locks could
55 * ever be held simultaneously.
57 #define MAX_SIMUL_LWLOCKS 100
59 static int num_held_lwlocks = 0;
60 static LWLockId held_lwlocks[MAX_SIMUL_LWLOCKS];
64 bool Trace_lwlocks = false;
67 PRINT_LWDEBUG(const char *where, LWLockId lockid, const LWLock *lock)
70 elog(DEBUG, "%s(%d): excl %d shared %d head %p",
72 (int) lock->exclusive, lock->shared, lock->head);
75 #else /* not LOCK_DEBUG */
76 #define PRINT_LWDEBUG(a,b,c)
77 #endif /* LOCK_DEBUG */
81 * Compute number of LWLocks to allocate.
89 * Possibly this logic should be spread out among the affected
90 * modules, the same way that shmem space estimation is done. But for
91 * now, there are few enough users of LWLocks that we can get away
92 * with just keeping the knowledge here.
95 /* Predefined LWLocks */
96 numLocks = (int) NumFixedLWLocks;
98 /* bufmgr.c needs two for each shared buffer */
99 numLocks += 2 * NBuffers;
101 /* clog.c needs one per CLOG buffer */
102 numLocks += NUM_CLOG_BUFFERS;
104 /* Perhaps create a few more for use by user-defined modules? */
111 * Compute shmem space needed for LWLocks.
114 LWLockShmemSize(void)
116 int numLocks = NumLWLocks();
119 /* Allocate the LWLocks plus space for shared allocation counter. */
120 spaceLocks = numLocks * sizeof(LWLock) + 2 * sizeof(int);
121 spaceLocks = MAXALIGN(spaceLocks);
123 return (int) spaceLocks;
128 * Allocate shmem space for LWLocks and initialize the locks.
133 int numLocks = NumLWLocks();
134 uint32 spaceLocks = LWLockShmemSize();
139 LWLockArray = (LWLock *) ShmemAlloc(spaceLocks);
142 * Initialize all LWLocks to "unlocked" state
144 for (id = 0, lock = LWLockArray; id < numLocks; id++, lock++)
146 SpinLockInit(&lock->mutex);
154 * Initialize the dynamic-allocation counter at the end of the array
156 LWLockCounter = (int *) lock;
157 LWLockCounter[0] = (int) NumFixedLWLocks;
158 LWLockCounter[1] = numLocks;
163 * LWLockAssign - assign a dynamically-allocated LWLock number
165 * NB: we do not currently try to interlock this. Could perhaps use
166 * ShmemLock spinlock if there were any need to assign LWLockIds after
172 if (LWLockCounter[0] >= LWLockCounter[1])
173 elog(FATAL, "No more LWLockIds available");
174 return (LWLockId) (LWLockCounter[0]++);
179 * LWLockAcquire - acquire a lightweight lock in the specified mode
181 * If the lock is not available, sleep until it is.
183 * Side effect: cancel/die interrupts are held off until lock release.
186 LWLockAcquire(LWLockId lockid, LWLockMode mode)
188 LWLock *lock = LWLockArray + lockid;
191 PRINT_LWDEBUG("LWLockAcquire", lockid, lock);
194 * Lock out cancel/die interrupts until we exit the code section
195 * protected by the LWLock. This ensures that interrupts will not
196 * interfere with manipulations of data structures in shared memory.
200 /* Acquire mutex. Time spent holding mutex should be short! */
201 SpinLockAcquire_NoHoldoff(&lock->mutex);
203 /* If I can get the lock, do so quickly. */
204 if (mode == LW_EXCLUSIVE)
206 if (lock->exclusive == 0 && lock->shared == 0)
217 * If there is someone waiting (presumably for exclusive access),
218 * queue up behind him even though I could get the lock. This
219 * prevents a stream of read locks from starving a writer.
221 if (lock->exclusive == 0 && lock->head == NULL)
232 /* Add myself to wait queue */
237 * If we don't have a PROC structure, there's no way to wait. This
238 * should never occur, since MyProc should only be null during
239 * shared memory initialization.
242 elog(FATAL, "LWLockAcquire: can't wait without a PROC structure");
244 proc->lwWaiting = true;
245 proc->lwExclusive = (mode == LW_EXCLUSIVE);
246 proc->lwWaitLink = NULL;
247 if (lock->head == NULL)
250 lock->tail->lwWaitLink = proc;
253 /* Can release the mutex now */
254 SpinLockRelease_NoHoldoff(&lock->mutex);
257 * Wait until awakened.
259 * Since we share the process wait semaphore with the regular lock
260 * manager and ProcWaitForSignal, and we may need to acquire an
261 * LWLock while one of those is pending, it is possible that we
262 * get awakened for a reason other than being granted the LWLock.
263 * If so, loop back and wait again. Once we've gotten the lock,
264 * re-increment the sema by the number of additional signals
265 * received, so that the lock manager or signal manager will see
266 * the received signal when it next waits.
270 /* "false" means cannot accept cancel/die interrupt here. */
271 IpcSemaphoreLock(proc->sem.semId, proc->sem.semNum, false);
272 if (!proc->lwWaiting)
278 * The awakener already updated the lock struct's state, so we
279 * don't need to do anything more to it. Just need to fix the
282 while (extraWaits-- > 0)
283 IpcSemaphoreUnlock(proc->sem.semId, proc->sem.semNum);
287 /* Got the lock without waiting */
288 SpinLockRelease_NoHoldoff(&lock->mutex);
291 /* Add lock to list of locks held by this backend */
292 Assert(num_held_lwlocks < MAX_SIMUL_LWLOCKS);
293 held_lwlocks[num_held_lwlocks++] = lockid;
297 * LWLockConditionalAcquire - acquire a lightweight lock in the specified mode
299 * If the lock is not available, return FALSE with no side-effects.
301 * If successful, cancel/die interrupts are held off until lock release.
304 LWLockConditionalAcquire(LWLockId lockid, LWLockMode mode)
306 LWLock *lock = LWLockArray + lockid;
309 PRINT_LWDEBUG("LWLockConditionalAcquire", lockid, lock);
312 * Lock out cancel/die interrupts until we exit the code section
313 * protected by the LWLock. This ensures that interrupts will not
314 * interfere with manipulations of data structures in shared memory.
318 /* Acquire mutex. Time spent holding mutex should be short! */
319 SpinLockAcquire_NoHoldoff(&lock->mutex);
321 /* If I can get the lock, do so quickly. */
322 if (mode == LW_EXCLUSIVE)
324 if (lock->exclusive == 0 && lock->shared == 0)
335 * If there is someone waiting (presumably for exclusive access),
336 * queue up behind him even though I could get the lock. This
337 * prevents a stream of read locks from starving a writer.
339 if (lock->exclusive == 0 && lock->head == NULL)
348 /* We are done updating shared state of the lock itself. */
349 SpinLockRelease_NoHoldoff(&lock->mutex);
353 /* Failed to get lock, so release interrupt holdoff */
358 /* Add lock to list of locks held by this backend */
359 Assert(num_held_lwlocks < MAX_SIMUL_LWLOCKS);
360 held_lwlocks[num_held_lwlocks++] = lockid;
367 * LWLockRelease - release a previously acquired lock
370 LWLockRelease(LWLockId lockid)
372 LWLock *lock = LWLockArray + lockid;
377 PRINT_LWDEBUG("LWLockRelease", lockid, lock);
380 * Remove lock from list of locks held. Usually, but not always, it
381 * will be the latest-acquired lock; so search array backwards.
383 for (i = num_held_lwlocks; --i >= 0;)
385 if (lockid == held_lwlocks[i])
389 elog(ERROR, "LWLockRelease: lock %d is not held", (int) lockid);
391 for (; i < num_held_lwlocks; i++)
392 held_lwlocks[i] = held_lwlocks[i + 1];
394 /* Acquire mutex. Time spent holding mutex should be short! */
395 SpinLockAcquire_NoHoldoff(&lock->mutex);
397 /* Release my hold on lock */
398 if (lock->exclusive > 0)
402 Assert(lock->shared > 0);
407 * See if I need to awaken any waiters. If I released a non-last
408 * shared hold, there cannot be anything to do.
413 if (lock->exclusive == 0 && lock->shared == 0)
416 * Remove the to-be-awakened PROCs from the queue, and update
417 * the lock state to show them as holding the lock.
420 if (proc->lwExclusive)
425 while (proc->lwWaitLink != NULL &&
426 !proc->lwWaitLink->lwExclusive)
428 proc = proc->lwWaitLink;
432 /* proc is now the last PROC to be released */
433 lock->head = proc->lwWaitLink;
434 proc->lwWaitLink = NULL;
438 /* lock is still held, can't awaken anything */
443 /* We are done updating shared state of the lock itself. */
444 SpinLockRelease_NoHoldoff(&lock->mutex);
447 * Awaken any waiters I removed from the queue.
452 head = proc->lwWaitLink;
453 proc->lwWaitLink = NULL;
454 proc->lwWaiting = false;
455 IpcSemaphoreUnlock(proc->sem.semId, proc->sem.semNum);
459 * Now okay to allow cancel/die interrupts.
466 * LWLockReleaseAll - release all currently-held locks
468 * Used to clean up after elog(ERROR). An important difference between this
469 * function and retail LWLockRelease calls is that InterruptHoldoffCount is
470 * unchanged by this operation. This is necessary since InterruptHoldoffCount
471 * has been set to an appropriate level earlier in error recovery. We could
472 * decrement it below zero if we allow it to drop for each released lock!
475 LWLockReleaseAll(void)
477 while (num_held_lwlocks > 0)
479 HOLD_INTERRUPTS(); /* match the upcoming RESUME_INTERRUPTS */
481 LWLockRelease(held_lwlocks[num_held_lwlocks - 1]);