1 /*-------------------------------------------------------------------------
4 * Lightweight lock manager
6 * Lightweight locks are intended primarily to provide mutual exclusion of
7 * access to shared-memory data structures. Therefore, they offer both
8 * exclusive and shared lock modes (to support read/write and read-only
9 * access to a shared object). There are few other frammishes. User-level
10 * locking should be done with the full lock manager --- which depends on
11 * LWLocks to protect its shared state.
14 * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
15 * Portions Copyright (c) 1994, Regents of the University of California
18 * $PostgreSQL: pgsql/src/backend/storage/lmgr/lwlock.c,v 1.39 2006/04/21 16:45:12 tgl Exp $
20 *-------------------------------------------------------------------------
24 #include "access/clog.h"
25 #include "access/multixact.h"
26 #include "access/subtrans.h"
27 #include "miscadmin.h"
28 #include "storage/ipc.h"
29 #include "storage/lwlock.h"
30 #include "storage/proc.h"
31 #include "storage/spin.h"
34 /* We use the ShmemLock spinlock to protect LWLockAssign */
35 extern slock_t *ShmemLock;
40 slock_t mutex; /* Protects LWLock and queue of PGPROCs */
41 bool releaseOK; /* T if ok to release waiters */
42 char exclusive; /* # of exclusive holders (0 or 1) */
43 int shared; /* # of shared holders (0..MaxBackends) */
44 PGPROC *head; /* head of list of waiting PGPROCs */
45 PGPROC *tail; /* tail of list of waiting PGPROCs */
46 /* tail is undefined when head is NULL */
50 * All the LWLock structs are allocated as an array in shared memory.
51 * (LWLockIds are indexes into the array.) We force the array stride to
52 * be a power of 2, which saves a few cycles in indexing, but more
53 * importantly also ensures that individual LWLocks don't cross cache line
54 * boundaries. This reduces cache contention problems, especially on AMD
55 * Opterons. (Of course, we have to also ensure that the array start
56 * address is suitably aligned.)
58 * LWLock is between 16 and 32 bytes on all known platforms, so these two
59 * cases are sufficient.
61 #define LWLOCK_PADDED_SIZE (sizeof(LWLock) <= 16 ? 16 : 32)
63 typedef union LWLockPadded
66 char pad[LWLOCK_PADDED_SIZE];
70 * This points to the array of LWLocks in shared memory. Backends inherit
71 * the pointer by fork from the postmaster (except in the EXEC_BACKEND case,
72 * where we have special measures to pass it down).
74 NON_EXEC_STATIC LWLockPadded *LWLockArray = NULL;
78 * We use this structure to keep track of locked LWLocks for release
79 * during error recovery. The maximum size could be determined at runtime
80 * if necessary, but it seems unlikely that more than a few locks could
81 * ever be held simultaneously.
83 #define MAX_SIMUL_LWLOCKS 100
85 static int num_held_lwlocks = 0;
86 static LWLockId held_lwlocks[MAX_SIMUL_LWLOCKS];
89 static int counts_for_pid = 0;
90 static int *sh_acquire_counts;
91 static int *ex_acquire_counts;
92 static int *block_counts;
97 bool Trace_lwlocks = false;
100 PRINT_LWDEBUG(const char *where, LWLockId lockid, const volatile LWLock *lock)
103 elog(LOG, "%s(%d): excl %d shared %d head %p rOK %d",
105 (int) lock->exclusive, lock->shared, lock->head,
106 (int) lock->releaseOK);
110 LOG_LWDEBUG(const char *where, LWLockId lockid, const char *msg)
113 elog(LOG, "%s(%d): %s", where, (int) lockid, msg);
115 #else /* not LOCK_DEBUG */
116 #define PRINT_LWDEBUG(a,b,c)
117 #define LOG_LWDEBUG(a,b,c)
118 #endif /* LOCK_DEBUG */
123 print_lwlock_stats(int code, Datum arg)
126 int *LWLockCounter = (int *) ((char *) LWLockArray - 2 * sizeof(int));
127 int numLocks = LWLockCounter[1];
129 /* Grab an LWLock to keep different backends from mixing reports */
130 LWLockAcquire(0, LW_EXCLUSIVE);
132 for (i = 0; i < numLocks; i++)
134 if (sh_acquire_counts[i] || ex_acquire_counts[i] || block_counts[i])
135 fprintf(stderr, "PID %d lwlock %d: shacq %u exacq %u blk %u\n",
136 MyProcPid, i, sh_acquire_counts[i], ex_acquire_counts[i],
143 #endif /* LWLOCK_STATS */
147 * Compute number of LWLocks to allocate.
155 * Possibly this logic should be spread out among the affected modules,
156 * the same way that shmem space estimation is done. But for now, there
157 * are few enough users of LWLocks that we can get away with just keeping
158 * the knowledge here.
161 /* Predefined LWLocks */
162 numLocks = (int) FirstLockMgrLock;
164 /* lock.c gets the ones starting at FirstLockMgrLock */
165 numLocks += NUM_LOCK_PARTITIONS;
167 /* bufmgr.c needs two for each shared buffer */
168 numLocks += 2 * NBuffers;
170 /* clog.c needs one per CLOG buffer */
171 numLocks += NUM_CLOG_BUFFERS;
173 /* subtrans.c needs one per SubTrans buffer */
174 numLocks += NUM_SUBTRANS_BUFFERS;
176 /* multixact.c needs two SLRU areas */
177 numLocks += NUM_MXACTOFFSET_BUFFERS + NUM_MXACTMEMBER_BUFFERS;
179 /* Leave a few extra for use by user-defined modules. */
180 numLocks += NUM_USER_DEFINED_LWLOCKS;
187 * Compute shmem space needed for LWLocks.
190 LWLockShmemSize(void)
193 int numLocks = NumLWLocks();
195 /* Space for the LWLock array. */
196 size = mul_size(numLocks, sizeof(LWLockPadded));
198 /* Space for dynamic allocation counter, plus room for alignment. */
199 size = add_size(size, 2 * sizeof(int) + LWLOCK_PADDED_SIZE);
206 * Allocate shmem space for LWLocks and initialize the locks.
211 int numLocks = NumLWLocks();
212 Size spaceLocks = LWLockShmemSize();
219 ptr = (char *) ShmemAlloc(spaceLocks);
221 /* Leave room for dynamic allocation counter */
222 ptr += 2 * sizeof(int);
224 /* Ensure desired alignment of LWLock array */
225 ptr += LWLOCK_PADDED_SIZE - ((unsigned long) ptr) % LWLOCK_PADDED_SIZE;
227 LWLockArray = (LWLockPadded *) ptr;
230 * Initialize all LWLocks to "unlocked" state
232 for (id = 0, lock = LWLockArray; id < numLocks; id++, lock++)
234 SpinLockInit(&lock->lock.mutex);
235 lock->lock.releaseOK = true;
236 lock->lock.exclusive = 0;
237 lock->lock.shared = 0;
238 lock->lock.head = NULL;
239 lock->lock.tail = NULL;
243 * Initialize the dynamic-allocation counter, which is stored just before
244 * the first LWLock. The LWLocks used by lock.c are not dynamically
245 * allocated, it just assumes it has them.
247 LWLockCounter = (int *) ((char *) LWLockArray - 2 * sizeof(int));
248 LWLockCounter[0] = (int) FirstLockMgrLock + NUM_LOCK_PARTITIONS;
249 LWLockCounter[1] = numLocks;
254 * LWLockAssign - assign a dynamically-allocated LWLock number
256 * We interlock this using the same spinlock that is used to protect
257 * ShmemAlloc(). Interlocking is not really necessary during postmaster
258 * startup, but it is needed if any user-defined code tries to allocate
259 * LWLocks after startup.
266 /* use volatile pointer to prevent code rearrangement */
267 volatile int *LWLockCounter;
269 LWLockCounter = (int *) ((char *) LWLockArray - 2 * sizeof(int));
270 SpinLockAcquire(ShmemLock);
271 if (LWLockCounter[0] >= LWLockCounter[1])
273 SpinLockRelease(ShmemLock);
274 elog(ERROR, "no more LWLockIds available");
276 result = (LWLockId) (LWLockCounter[0]++);
277 SpinLockRelease(ShmemLock);
283 * LWLockAcquire - acquire a lightweight lock in the specified mode
285 * If the lock is not available, sleep until it is.
287 * Side effect: cancel/die interrupts are held off until lock release.
290 LWLockAcquire(LWLockId lockid, LWLockMode mode)
292 volatile LWLock *lock = &(LWLockArray[lockid].lock);
293 PGPROC *proc = MyProc;
297 PRINT_LWDEBUG("LWLockAcquire", lockid, lock);
300 /* Set up local count state first time through in a given process */
301 if (counts_for_pid != MyProcPid)
303 int *LWLockCounter = (int *) ((char *) LWLockArray - 2 * sizeof(int));
304 int numLocks = LWLockCounter[1];
306 sh_acquire_counts = calloc(numLocks, sizeof(int));
307 ex_acquire_counts = calloc(numLocks, sizeof(int));
308 block_counts = calloc(numLocks, sizeof(int));
309 counts_for_pid = MyProcPid;
310 on_shmem_exit(print_lwlock_stats, 0);
312 /* Count lock acquisition attempts */
313 if (mode == LW_EXCLUSIVE)
314 ex_acquire_counts[lockid]++;
316 sh_acquire_counts[lockid]++;
317 #endif /* LWLOCK_STATS */
320 * We can't wait if we haven't got a PGPROC. This should only occur
321 * during bootstrap or shared memory initialization. Put an Assert here
322 * to catch unsafe coding practices.
324 Assert(!(proc == NULL && IsUnderPostmaster));
326 /* Ensure we will have room to remember the lock */
327 if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
328 elog(ERROR, "too many LWLocks taken");
331 * Lock out cancel/die interrupts until we exit the code section protected
332 * by the LWLock. This ensures that interrupts will not interfere with
333 * manipulations of data structures in shared memory.
338 * Loop here to try to acquire lock after each time we are signaled by
341 * NOTE: it might seem better to have LWLockRelease actually grant us the
342 * lock, rather than retrying and possibly having to go back to sleep. But
343 * in practice that is no good because it means a process swap for every
344 * lock acquisition when two or more processes are contending for the same
345 * lock. Since LWLocks are normally used to protect not-very-long
346 * sections of computation, a process needs to be able to acquire and
347 * release the same lock many times during a single CPU time slice, even
348 * in the presence of contention. The efficiency of being able to do that
349 * outweighs the inefficiency of sometimes wasting a process dispatch
350 * cycle because the lock is not free when a released waiter finally gets
351 * to run. See pgsql-hackers archives for 29-Dec-01.
357 /* Acquire mutex. Time spent holding mutex should be short! */
358 SpinLockAcquire(&lock->mutex);
360 /* If retrying, allow LWLockRelease to release waiters again */
362 lock->releaseOK = true;
364 /* If I can get the lock, do so quickly. */
365 if (mode == LW_EXCLUSIVE)
367 if (lock->exclusive == 0 && lock->shared == 0)
377 if (lock->exclusive == 0)
387 break; /* got the lock */
390 * Add myself to wait queue.
392 * If we don't have a PGPROC structure, there's no way to wait. This
393 * should never occur, since MyProc should only be null during shared
394 * memory initialization.
397 elog(PANIC, "cannot wait without a PGPROC structure");
399 proc->lwWaiting = true;
400 proc->lwExclusive = (mode == LW_EXCLUSIVE);
401 proc->lwWaitLink = NULL;
402 if (lock->head == NULL)
405 lock->tail->lwWaitLink = proc;
408 /* Can release the mutex now */
409 SpinLockRelease(&lock->mutex);
412 * Wait until awakened.
414 * Since we share the process wait semaphore with the regular lock
415 * manager and ProcWaitForSignal, and we may need to acquire an LWLock
416 * while one of those is pending, it is possible that we get awakened
417 * for a reason other than being signaled by LWLockRelease. If so,
418 * loop back and wait again. Once we've gotten the LWLock,
419 * re-increment the sema by the number of additional signals received,
420 * so that the lock manager or signal manager will see the received
421 * signal when it next waits.
423 LOG_LWDEBUG("LWLockAcquire", lockid, "waiting");
426 block_counts[lockid]++;
431 /* "false" means cannot accept cancel/die interrupt here. */
432 PGSemaphoreLock(&proc->sem, false);
433 if (!proc->lwWaiting)
438 LOG_LWDEBUG("LWLockAcquire", lockid, "awakened");
440 /* Now loop back and try to acquire lock again. */
444 /* We are done updating shared state of the lock itself. */
445 SpinLockRelease(&lock->mutex);
447 /* Add lock to list of locks held by this backend */
448 held_lwlocks[num_held_lwlocks++] = lockid;
451 * Fix the process wait semaphore's count for any absorbed wakeups.
453 while (extraWaits-- > 0)
454 PGSemaphoreUnlock(&proc->sem);
458 * LWLockConditionalAcquire - acquire a lightweight lock in the specified mode
460 * If the lock is not available, return FALSE with no side-effects.
462 * If successful, cancel/die interrupts are held off until lock release.
465 LWLockConditionalAcquire(LWLockId lockid, LWLockMode mode)
467 volatile LWLock *lock = &(LWLockArray[lockid].lock);
470 PRINT_LWDEBUG("LWLockConditionalAcquire", lockid, lock);
472 /* Ensure we will have room to remember the lock */
473 if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
474 elog(ERROR, "too many LWLocks taken");
477 * Lock out cancel/die interrupts until we exit the code section protected
478 * by the LWLock. This ensures that interrupts will not interfere with
479 * manipulations of data structures in shared memory.
483 /* Acquire mutex. Time spent holding mutex should be short! */
484 SpinLockAcquire(&lock->mutex);
486 /* If I can get the lock, do so quickly. */
487 if (mode == LW_EXCLUSIVE)
489 if (lock->exclusive == 0 && lock->shared == 0)
499 if (lock->exclusive == 0)
508 /* We are done updating shared state of the lock itself. */
509 SpinLockRelease(&lock->mutex);
513 /* Failed to get lock, so release interrupt holdoff */
515 LOG_LWDEBUG("LWLockConditionalAcquire", lockid, "failed");
519 /* Add lock to list of locks held by this backend */
520 held_lwlocks[num_held_lwlocks++] = lockid;
527 * LWLockRelease - release a previously acquired lock
530 LWLockRelease(LWLockId lockid)
532 volatile LWLock *lock = &(LWLockArray[lockid].lock);
537 PRINT_LWDEBUG("LWLockRelease", lockid, lock);
540 * Remove lock from list of locks held. Usually, but not always, it will
541 * be the latest-acquired lock; so search array backwards.
543 for (i = num_held_lwlocks; --i >= 0;)
545 if (lockid == held_lwlocks[i])
549 elog(ERROR, "lock %d is not held", (int) lockid);
551 for (; i < num_held_lwlocks; i++)
552 held_lwlocks[i] = held_lwlocks[i + 1];
554 /* Acquire mutex. Time spent holding mutex should be short! */
555 SpinLockAcquire(&lock->mutex);
557 /* Release my hold on lock */
558 if (lock->exclusive > 0)
562 Assert(lock->shared > 0);
567 * See if I need to awaken any waiters. If I released a non-last shared
568 * hold, there cannot be anything to do. Also, do not awaken any waiters
569 * if someone has already awakened waiters that haven't yet acquired the
575 if (lock->exclusive == 0 && lock->shared == 0 && lock->releaseOK)
578 * Remove the to-be-awakened PGPROCs from the queue. If the front
579 * waiter wants exclusive lock, awaken him only. Otherwise awaken
580 * as many waiters as want shared access.
583 if (!proc->lwExclusive)
585 while (proc->lwWaitLink != NULL &&
586 !proc->lwWaitLink->lwExclusive)
587 proc = proc->lwWaitLink;
589 /* proc is now the last PGPROC to be released */
590 lock->head = proc->lwWaitLink;
591 proc->lwWaitLink = NULL;
592 /* prevent additional wakeups until retryer gets to run */
593 lock->releaseOK = false;
597 /* lock is still held, can't awaken anything */
602 /* We are done updating shared state of the lock itself. */
603 SpinLockRelease(&lock->mutex);
606 * Awaken any waiters I removed from the queue.
610 LOG_LWDEBUG("LWLockRelease", lockid, "release waiter");
612 head = proc->lwWaitLink;
613 proc->lwWaitLink = NULL;
614 proc->lwWaiting = false;
615 PGSemaphoreUnlock(&proc->sem);
619 * Now okay to allow cancel/die interrupts.
626 * LWLockReleaseAll - release all currently-held locks
628 * Used to clean up after ereport(ERROR). An important difference between this
629 * function and retail LWLockRelease calls is that InterruptHoldoffCount is
630 * unchanged by this operation. This is necessary since InterruptHoldoffCount
631 * has been set to an appropriate level earlier in error recovery. We could
632 * decrement it below zero if we allow it to drop for each released lock!
635 LWLockReleaseAll(void)
637 while (num_held_lwlocks > 0)
639 HOLD_INTERRUPTS(); /* match the upcoming RESUME_INTERRUPTS */
641 LWLockRelease(held_lwlocks[num_held_lwlocks - 1]);
647 * LWLockHeldByMe - test whether my process currently holds a lock
649 * This is meant as debug support only. We do not distinguish whether the
650 * lock is held shared or exclusive.
653 LWLockHeldByMe(LWLockId lockid)
657 for (i = 0; i < num_held_lwlocks; i++)
659 if (held_lwlocks[i] == lockid)