1 /*-------------------------------------------------------------------------
4 * Lightweight lock manager
6 * Lightweight locks are intended primarily to provide mutual exclusion of
7 * access to shared-memory data structures. Therefore, they offer both
8 * exclusive and shared lock modes (to support read/write and read-only
9 * access to a shared object). There are few other frammishes. User-level
10 * locking should be done with the full lock manager --- which depends on
11 * an LWLock to protect its shared state.
14 * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
15 * Portions Copyright (c) 1994, Regents of the University of California
18 * $PostgreSQL: pgsql/src/backend/storage/lmgr/lwlock.c,v 1.31 2005/10/07 20:11:03 tgl Exp $
20 *-------------------------------------------------------------------------
24 #include "access/slru.h"
25 #include "storage/lwlock.h"
26 #include "storage/proc.h"
27 #include "storage/spin.h"
32 slock_t mutex; /* Protects LWLock and queue of PGPROCs */
33 bool releaseOK; /* T if ok to release waiters */
34 char exclusive; /* # of exclusive holders (0 or 1) */
35 int shared; /* # of shared holders (0..MaxBackends) */
36 PGPROC *head; /* head of list of waiting PGPROCs */
37 PGPROC *tail; /* tail of list of waiting PGPROCs */
38 /* tail is undefined when head is NULL */
42 * All the LWLock structs are allocated as an array in shared memory.
43 * (LWLockIds are indexes into the array.) We force the array stride to
44 * be a power of 2, which saves a few cycles in indexing, but more
45 * importantly also ensures that individual LWLocks don't cross cache line
46 * boundaries. This reduces cache contention problems, especially on AMD
47 * Opterons. (Of course, we have to also ensure that the array start
48 * address is suitably aligned.)
50 * LWLock is between 16 and 32 bytes on all known platforms, so these two
51 * cases are sufficient.
53 #define LWLOCK_PADDED_SIZE (sizeof(LWLock) <= 16 ? 16 : 32)
55 typedef union LWLockPadded
58 char pad[LWLOCK_PADDED_SIZE];
62 * This points to the array of LWLocks in shared memory. Backends inherit
63 * the pointer by fork from the postmaster (except in the EXEC_BACKEND case,
64 * where we have special measures to pass it down).
66 NON_EXEC_STATIC LWLockPadded *LWLockArray = NULL;
68 /* shared counter for dynamic allocation of LWLockIds */
69 static int *LWLockCounter;
73 * We use this structure to keep track of locked LWLocks for release
74 * during error recovery. The maximum size could be determined at runtime
75 * if necessary, but it seems unlikely that more than a few locks could
76 * ever be held simultaneously.
78 #define MAX_SIMUL_LWLOCKS 100
80 static int num_held_lwlocks = 0;
81 static LWLockId held_lwlocks[MAX_SIMUL_LWLOCKS];
85 bool Trace_lwlocks = false;
88 PRINT_LWDEBUG(const char *where, LWLockId lockid, const volatile LWLock *lock)
91 elog(LOG, "%s(%d): excl %d shared %d head %p rOK %d",
93 (int) lock->exclusive, lock->shared, lock->head,
94 (int) lock->releaseOK);
98 LOG_LWDEBUG(const char *where, LWLockId lockid, const char *msg)
101 elog(LOG, "%s(%d): %s", where, (int) lockid, msg);
104 #else /* not LOCK_DEBUG */
105 #define PRINT_LWDEBUG(a,b,c)
106 #define LOG_LWDEBUG(a,b,c)
107 #endif /* LOCK_DEBUG */
111 * Compute number of LWLocks to allocate.
119 * Possibly this logic should be spread out among the affected
120 * modules, the same way that shmem space estimation is done. But for
121 * now, there are few enough users of LWLocks that we can get away
122 * with just keeping the knowledge here.
125 /* Predefined LWLocks */
126 numLocks = (int) NumFixedLWLocks;
128 /* bufmgr.c needs two for each shared buffer */
129 numLocks += 2 * NBuffers;
131 /* clog.c needs one per CLOG buffer */
132 numLocks += NUM_SLRU_BUFFERS;
134 /* subtrans.c needs one per SubTrans buffer */
135 numLocks += NUM_SLRU_BUFFERS;
138 * multixact.c needs one per MultiXact buffer, but there are
139 * two SLRU areas for MultiXact
141 numLocks += 2 * NUM_SLRU_BUFFERS;
143 /* Leave a few extra for use by user-defined modules. */
144 numLocks += NUM_USER_DEFINED_LWLOCKS;
151 * Compute shmem space needed for LWLocks.
154 LWLockShmemSize(void)
157 int numLocks = NumLWLocks();
159 /* Space for the LWLock array. */
160 size = mul_size(numLocks, sizeof(LWLockPadded));
162 /* Space for shared allocation counter, plus room for alignment. */
163 size = add_size(size, 2 * sizeof(int) + LWLOCK_PADDED_SIZE);
170 * Allocate shmem space for LWLocks and initialize the locks.
175 int numLocks = NumLWLocks();
176 Size spaceLocks = LWLockShmemSize();
182 ptr = (char *) ShmemAlloc(spaceLocks);
184 /* Ensure desired alignment of LWLock array */
185 ptr += LWLOCK_PADDED_SIZE - ((unsigned long) ptr) % LWLOCK_PADDED_SIZE;
187 LWLockArray = (LWLockPadded *) ptr;
190 * Initialize all LWLocks to "unlocked" state
192 for (id = 0, lock = LWLockArray; id < numLocks; id++, lock++)
194 SpinLockInit(&lock->lock.mutex);
195 lock->lock.releaseOK = true;
196 lock->lock.exclusive = 0;
197 lock->lock.shared = 0;
198 lock->lock.head = NULL;
199 lock->lock.tail = NULL;
203 * Initialize the dynamic-allocation counter at the end of the array
205 LWLockCounter = (int *) lock;
206 LWLockCounter[0] = (int) NumFixedLWLocks;
207 LWLockCounter[1] = numLocks;
212 * LWLockAssign - assign a dynamically-allocated LWLock number
214 * NB: we do not currently try to interlock this. Could perhaps use
215 * ShmemLock spinlock if there were any need to assign LWLockIds after
221 if (LWLockCounter[0] >= LWLockCounter[1])
222 elog(FATAL, "no more LWLockIds available");
223 return (LWLockId) (LWLockCounter[0]++);
228 * LWLockAcquire - acquire a lightweight lock in the specified mode
230 * If the lock is not available, sleep until it is.
232 * Side effect: cancel/die interrupts are held off until lock release.
235 LWLockAcquire(LWLockId lockid, LWLockMode mode)
237 volatile LWLock *lock = &(LWLockArray[lockid].lock);
238 PGPROC *proc = MyProc;
242 PRINT_LWDEBUG("LWLockAcquire", lockid, lock);
245 * We can't wait if we haven't got a PGPROC. This should only occur
246 * during bootstrap or shared memory initialization. Put an Assert
247 * here to catch unsafe coding practices.
249 Assert(!(proc == NULL && IsUnderPostmaster));
251 /* Ensure we will have room to remember the lock */
252 if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
253 elog(ERROR, "too many LWLocks taken");
256 * Lock out cancel/die interrupts until we exit the code section
257 * protected by the LWLock. This ensures that interrupts will not
258 * interfere with manipulations of data structures in shared memory.
263 * Loop here to try to acquire lock after each time we are signaled by
266 * NOTE: it might seem better to have LWLockRelease actually grant us the
267 * lock, rather than retrying and possibly having to go back to sleep.
268 * But in practice that is no good because it means a process swap for
269 * every lock acquisition when two or more processes are contending
270 * for the same lock. Since LWLocks are normally used to protect
271 * not-very-long sections of computation, a process needs to be able
272 * to acquire and release the same lock many times during a single CPU
273 * time slice, even in the presence of contention. The efficiency of
274 * being able to do that outweighs the inefficiency of sometimes
275 * wasting a process dispatch cycle because the lock is not free when
276 * a released waiter finally gets to run. See pgsql-hackers archives
283 /* Acquire mutex. Time spent holding mutex should be short! */
284 SpinLockAcquire_NoHoldoff(&lock->mutex);
286 /* If retrying, allow LWLockRelease to release waiters again */
288 lock->releaseOK = true;
290 /* If I can get the lock, do so quickly. */
291 if (mode == LW_EXCLUSIVE)
293 if (lock->exclusive == 0 && lock->shared == 0)
303 if (lock->exclusive == 0)
313 break; /* got the lock */
316 * Add myself to wait queue.
318 * If we don't have a PGPROC structure, there's no way to wait. This
319 * should never occur, since MyProc should only be null during
320 * shared memory initialization.
323 elog(FATAL, "cannot wait without a PGPROC structure");
325 proc->lwWaiting = true;
326 proc->lwExclusive = (mode == LW_EXCLUSIVE);
327 proc->lwWaitLink = NULL;
328 if (lock->head == NULL)
331 lock->tail->lwWaitLink = proc;
334 /* Can release the mutex now */
335 SpinLockRelease_NoHoldoff(&lock->mutex);
338 * Wait until awakened.
340 * Since we share the process wait semaphore with the regular lock
341 * manager and ProcWaitForSignal, and we may need to acquire an
342 * LWLock while one of those is pending, it is possible that we
343 * get awakened for a reason other than being signaled by
344 * LWLockRelease. If so, loop back and wait again. Once we've
345 * gotten the LWLock, re-increment the sema by the number of
346 * additional signals received, so that the lock manager or signal
347 * manager will see the received signal when it next waits.
349 LOG_LWDEBUG("LWLockAcquire", lockid, "waiting");
353 /* "false" means cannot accept cancel/die interrupt here. */
354 PGSemaphoreLock(&proc->sem, false);
355 if (!proc->lwWaiting)
360 LOG_LWDEBUG("LWLockAcquire", lockid, "awakened");
362 /* Now loop back and try to acquire lock again. */
366 /* We are done updating shared state of the lock itself. */
367 SpinLockRelease_NoHoldoff(&lock->mutex);
369 /* Add lock to list of locks held by this backend */
370 held_lwlocks[num_held_lwlocks++] = lockid;
373 * Fix the process wait semaphore's count for any absorbed wakeups.
375 while (extraWaits-- > 0)
376 PGSemaphoreUnlock(&proc->sem);
380 * LWLockConditionalAcquire - acquire a lightweight lock in the specified mode
382 * If the lock is not available, return FALSE with no side-effects.
384 * If successful, cancel/die interrupts are held off until lock release.
387 LWLockConditionalAcquire(LWLockId lockid, LWLockMode mode)
389 volatile LWLock *lock = &(LWLockArray[lockid].lock);
392 PRINT_LWDEBUG("LWLockConditionalAcquire", lockid, lock);
394 /* Ensure we will have room to remember the lock */
395 if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
396 elog(ERROR, "too many LWLocks taken");
399 * Lock out cancel/die interrupts until we exit the code section
400 * protected by the LWLock. This ensures that interrupts will not
401 * interfere with manipulations of data structures in shared memory.
405 /* Acquire mutex. Time spent holding mutex should be short! */
406 SpinLockAcquire_NoHoldoff(&lock->mutex);
408 /* If I can get the lock, do so quickly. */
409 if (mode == LW_EXCLUSIVE)
411 if (lock->exclusive == 0 && lock->shared == 0)
421 if (lock->exclusive == 0)
430 /* We are done updating shared state of the lock itself. */
431 SpinLockRelease_NoHoldoff(&lock->mutex);
435 /* Failed to get lock, so release interrupt holdoff */
437 LOG_LWDEBUG("LWLockConditionalAcquire", lockid, "failed");
441 /* Add lock to list of locks held by this backend */
442 held_lwlocks[num_held_lwlocks++] = lockid;
449 * LWLockRelease - release a previously acquired lock
452 LWLockRelease(LWLockId lockid)
454 volatile LWLock *lock = &(LWLockArray[lockid].lock);
459 PRINT_LWDEBUG("LWLockRelease", lockid, lock);
462 * Remove lock from list of locks held. Usually, but not always, it
463 * will be the latest-acquired lock; so search array backwards.
465 for (i = num_held_lwlocks; --i >= 0;)
467 if (lockid == held_lwlocks[i])
471 elog(ERROR, "lock %d is not held", (int) lockid);
473 for (; i < num_held_lwlocks; i++)
474 held_lwlocks[i] = held_lwlocks[i + 1];
476 /* Acquire mutex. Time spent holding mutex should be short! */
477 SpinLockAcquire_NoHoldoff(&lock->mutex);
479 /* Release my hold on lock */
480 if (lock->exclusive > 0)
484 Assert(lock->shared > 0);
489 * See if I need to awaken any waiters. If I released a non-last
490 * shared hold, there cannot be anything to do. Also, do not awaken
491 * any waiters if someone has already awakened waiters that haven't
492 * yet acquired the lock.
497 if (lock->exclusive == 0 && lock->shared == 0 && lock->releaseOK)
500 * Remove the to-be-awakened PGPROCs from the queue. If the
501 * front waiter wants exclusive lock, awaken him only.
502 * Otherwise awaken as many waiters as want shared access.
505 if (!proc->lwExclusive)
507 while (proc->lwWaitLink != NULL &&
508 !proc->lwWaitLink->lwExclusive)
509 proc = proc->lwWaitLink;
511 /* proc is now the last PGPROC to be released */
512 lock->head = proc->lwWaitLink;
513 proc->lwWaitLink = NULL;
514 /* prevent additional wakeups until retryer gets to run */
515 lock->releaseOK = false;
519 /* lock is still held, can't awaken anything */
524 /* We are done updating shared state of the lock itself. */
525 SpinLockRelease_NoHoldoff(&lock->mutex);
528 * Awaken any waiters I removed from the queue.
532 LOG_LWDEBUG("LWLockRelease", lockid, "release waiter");
534 head = proc->lwWaitLink;
535 proc->lwWaitLink = NULL;
536 proc->lwWaiting = false;
537 PGSemaphoreUnlock(&proc->sem);
541 * Now okay to allow cancel/die interrupts.
548 * LWLockReleaseAll - release all currently-held locks
550 * Used to clean up after ereport(ERROR). An important difference between this
551 * function and retail LWLockRelease calls is that InterruptHoldoffCount is
552 * unchanged by this operation. This is necessary since InterruptHoldoffCount
553 * has been set to an appropriate level earlier in error recovery. We could
554 * decrement it below zero if we allow it to drop for each released lock!
557 LWLockReleaseAll(void)
559 while (num_held_lwlocks > 0)
561 HOLD_INTERRUPTS(); /* match the upcoming RESUME_INTERRUPTS */
563 LWLockRelease(held_lwlocks[num_held_lwlocks - 1]);
569 * LWLockHeldByMe - test whether my process currently holds a lock
571 * This is meant as debug support only. We do not distinguish whether the
572 * lock is held shared or exclusive.
575 LWLockHeldByMe(LWLockId lockid)
579 for (i = 0; i < num_held_lwlocks; i++)
581 if (held_lwlocks[i] == lockid)