1 /*-------------------------------------------------------------------------
4 * Lightweight lock manager
6 * Lightweight locks are intended primarily to provide mutual exclusion of
7 * access to shared-memory data structures. Therefore, they offer both
8 * exclusive and shared lock modes (to support read/write and read-only
9 * access to a shared object). There are few other frammishes. User-level
10 * locking should be done with the full lock manager --- which depends on
11 * LWLocks to protect its shared state.
13 * In addition to exclusive and shared modes, lightweight locks can be used
14 * to wait until a variable changes value. The variable is initially set
15 * when the lock is acquired with LWLockAcquireWithVar, and can be updated
16 * without releasing the lock by calling LWLockUpdateVar. LWLockWaitForVar
17 * waits for the variable to be updated, or until the lock is free. The
18 * meaning of the variable is up to the caller, the lightweight lock code
19 * just assigns and compares it.
21 * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
22 * Portions Copyright (c) 1994, Regents of the University of California
25 * src/backend/storage/lmgr/lwlock.c
29 * This used to be a pretty straight forward reader-writer lock
30 * implementation, in which the internal state was protected by a
31 * spinlock. Unfortunately the overhead of taking the spinlock proved to be
32 * too high for workloads/locks that were taken in shared mode very
33 * frequently. Often we were spinning in the (obviously exclusive) spinlock,
34 * while trying to acquire a shared lock that was actually free.
36 * Thus a new implementation was devised that provides wait-free shared lock
37 * acquisition for locks that aren't exclusively locked.
39 * The basic idea is to have a single atomic variable 'lockcount' instead of
40 * the formerly separate shared and exclusive counters and to use atomic
41 * operations to acquire the lock. That's fairly easy to do for plain
42 * rw-spinlocks, but a lot harder for something like LWLocks that want to wait
45 * For lock acquisition we use an atomic compare-and-exchange on the lockcount
46 * variable. For exclusive lock we swap in a sentinel value
47 * (LW_VAL_EXCLUSIVE), for shared locks we count the number of holders.
49 * To release the lock we use an atomic decrement to release the lock. If the
50 * new value is zero (we get that atomically), we know we can/have to release
53 * Obviously it is important that the sentinel value for exclusive locks
54 * doesn't conflict with the maximum number of possible share lockers -
55 * luckily MAX_BACKENDS makes that easily possible.
58 * The attentive reader might have noticed that naively doing the above has a
59 * glaring race condition: We try to lock using the atomic operations and
60 * notice that we have to wait. Unfortunately by the time we have finished
61 * queuing, the former locker very well might have already finished it's
62 * work. That's problematic because we're now stuck waiting inside the OS.
64 * To mitigate those races we use a two phased attempt at locking:
65 * Phase 1: Try to do it atomically, if we succeed, nice
66 * Phase 2: Add ourselves to the waitqueue of the lock
67 * Phase 3: Try to grab the lock again, if we succeed, remove ourselves from
69 * Phase 4: Sleep till wake-up, goto Phase 1
71 * This protects us against the problem from above as nobody can release too
72 * quick, before we're queued, since after Phase 2 we're already queued.
73 * -------------------------------------------------------------------------
77 #include "access/clog.h"
78 #include "access/commit_ts.h"
79 #include "access/multixact.h"
80 #include "access/subtrans.h"
81 #include "commands/async.h"
82 #include "miscadmin.h"
84 #include "postmaster/postmaster.h"
85 #include "replication/slot.h"
86 #include "storage/ipc.h"
87 #include "storage/predicate.h"
88 #include "storage/proc.h"
89 #include "storage/spin.h"
90 #include "utils/memutils.h"
93 #include "utils/hsearch.h"
97 /* We use the ShmemLock spinlock to protect LWLockAssign */
98 extern slock_t *ShmemLock;
100 #define LW_FLAG_HAS_WAITERS ((uint32) 1 << 30)
101 #define LW_FLAG_RELEASE_OK ((uint32) 1 << 29)
103 #define LW_VAL_EXCLUSIVE ((uint32) 1 << 24)
104 #define LW_VAL_SHARED 1
106 #define LW_LOCK_MASK ((uint32) ((1 << 25)-1))
107 /* Must be greater than MAX_BACKENDS - which is 2^23-1, so we're fine. */
108 #define LW_SHARED_MASK ((uint32)(1 << 23))
111 * This is indexed by tranche ID and stores metadata for all tranches known
112 * to the current backend.
114 static LWLockTranche **LWLockTrancheArray = NULL;
115 static int LWLockTranchesAllocated = 0;
117 #define T_NAME(lock) \
118 (LWLockTrancheArray[(lock)->tranche]->name)
120 ((int) ((((char *) lock) - \
121 ((char *) LWLockTrancheArray[(lock)->tranche]->array_base)) / \
122 LWLockTrancheArray[(lock)->tranche]->array_stride))
125 * This points to the main array of LWLocks in shared memory. Backends inherit
126 * the pointer by fork from the postmaster (except in the EXEC_BACKEND case,
127 * where we have special measures to pass it down).
129 LWLockPadded *MainLWLockArray = NULL;
130 static LWLockTranche MainLWLockTranche;
133 * We use this structure to keep track of locked LWLocks for release
134 * during error recovery. Normally, only a few will be held at once, but
135 * occasionally the number can be much higher; for example, the pg_buffercache
136 * extension locks all buffer partitions simultaneously.
138 #define MAX_SIMUL_LWLOCKS 200
140 /* struct representing the LWLocks we're holding */
141 typedef struct LWLockHandle
147 static int num_held_lwlocks = 0;
148 static LWLockHandle held_lwlocks[MAX_SIMUL_LWLOCKS];
150 static int lock_addin_request = 0;
151 static bool lock_addin_request_allowed = true;
153 static inline bool LWLockAcquireCommon(LWLock *l, LWLockMode mode,
154 uint64 *valptr, uint64 val);
157 typedef struct lwlock_stats_key
163 typedef struct lwlock_stats
165 lwlock_stats_key key;
166 int sh_acquire_count;
167 int ex_acquire_count;
169 int dequeue_self_count;
170 int spin_delay_count;
173 static HTAB *lwlock_stats_htab;
174 static lwlock_stats lwlock_stats_dummy;
178 bool Trace_lwlocks = false;
181 PRINT_LWDEBUG(const char *where, LWLock *lock, LWLockMode mode)
183 /* hide statement & context here, otherwise the log is just too verbose */
186 uint32 state = pg_atomic_read_u32(&lock->state);
189 errhidecontext(true),
190 errmsg("%d: %s(%s %d): excl %u shared %u haswaiters %u waiters %u rOK %d",
192 where, T_NAME(lock), T_ID(lock),
193 !!(state & LW_VAL_EXCLUSIVE),
194 state & LW_SHARED_MASK,
195 !!(state & LW_FLAG_HAS_WAITERS),
196 pg_atomic_read_u32(&lock->nwaiters),
197 !!(state & LW_FLAG_RELEASE_OK))));
202 LOG_LWDEBUG(const char *where, LWLock *lock, const char *msg)
204 /* hide statement & context here, otherwise the log is just too verbose */
209 errhidecontext(true),
210 errmsg("%s(%s %d): %s", where, T_NAME(lock), T_ID(lock), msg)));
214 #else /* not LOCK_DEBUG */
215 #define PRINT_LWDEBUG(a,b,c) ((void)0)
216 #define LOG_LWDEBUG(a,b,c) ((void)0)
217 #endif /* LOCK_DEBUG */
221 static void init_lwlock_stats(void);
222 static void print_lwlock_stats(int code, Datum arg);
223 static lwlock_stats *get_lwlock_stats_entry(LWLock *lockid);
226 init_lwlock_stats(void)
229 static MemoryContext lwlock_stats_cxt = NULL;
230 static bool exit_registered = false;
232 if (lwlock_stats_cxt != NULL)
233 MemoryContextDelete(lwlock_stats_cxt);
236 * The LWLock stats will be updated within a critical section, which
237 * requires allocating new hash entries. Allocations within a critical
238 * section are normally not allowed because running out of memory would
239 * lead to a PANIC, but LWLOCK_STATS is debugging code that's not normally
240 * turned on in production, so that's an acceptable risk. The hash entries
241 * are small, so the risk of running out of memory is minimal in practice.
243 lwlock_stats_cxt = AllocSetContextCreate(TopMemoryContext,
245 ALLOCSET_DEFAULT_MINSIZE,
246 ALLOCSET_DEFAULT_INITSIZE,
247 ALLOCSET_DEFAULT_MAXSIZE);
248 MemoryContextAllowInCriticalSection(lwlock_stats_cxt, true);
250 MemSet(&ctl, 0, sizeof(ctl));
251 ctl.keysize = sizeof(lwlock_stats_key);
252 ctl.entrysize = sizeof(lwlock_stats);
253 ctl.hcxt = lwlock_stats_cxt;
254 lwlock_stats_htab = hash_create("lwlock stats", 16384, &ctl,
255 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
256 if (!exit_registered)
258 on_shmem_exit(print_lwlock_stats, 0);
259 exit_registered = true;
264 print_lwlock_stats(int code, Datum arg)
266 HASH_SEQ_STATUS scan;
267 lwlock_stats *lwstats;
269 hash_seq_init(&scan, lwlock_stats_htab);
271 /* Grab an LWLock to keep different backends from mixing reports */
272 LWLockAcquire(&MainLWLockArray[0].lock, LW_EXCLUSIVE);
274 while ((lwstats = (lwlock_stats *) hash_seq_search(&scan)) != NULL)
277 "PID %d lwlock %s %d: shacq %u exacq %u blk %u spindelay %u dequeue self %u\n",
278 MyProcPid, LWLockTrancheArray[lwstats->key.tranche]->name,
279 lwstats->key.instance, lwstats->sh_acquire_count,
280 lwstats->ex_acquire_count, lwstats->block_count,
281 lwstats->spin_delay_count, lwstats->dequeue_self_count);
284 LWLockRelease(&MainLWLockArray[0].lock);
287 static lwlock_stats *
288 get_lwlock_stats_entry(LWLock *lock)
290 lwlock_stats_key key;
291 lwlock_stats *lwstats;
295 * During shared memory initialization, the hash table doesn't exist yet.
296 * Stats of that phase aren't very interesting, so just collect operations
297 * on all locks in a single dummy entry.
299 if (lwlock_stats_htab == NULL)
300 return &lwlock_stats_dummy;
302 /* Fetch or create the entry. */
303 key.tranche = lock->tranche;
304 key.instance = T_ID(lock);
305 lwstats = hash_search(lwlock_stats_htab, &key, HASH_ENTER, &found);
308 lwstats->sh_acquire_count = 0;
309 lwstats->ex_acquire_count = 0;
310 lwstats->block_count = 0;
311 lwstats->dequeue_self_count = 0;
312 lwstats->spin_delay_count = 0;
316 #endif /* LWLOCK_STATS */
320 * Compute number of LWLocks to allocate in the main array.
328 * Possibly this logic should be spread out among the affected modules,
329 * the same way that shmem space estimation is done. But for now, there
330 * are few enough users of LWLocks that we can get away with just keeping
331 * the knowledge here.
334 /* Predefined LWLocks */
335 numLocks = NUM_FIXED_LWLOCKS;
337 /* bufmgr.c needs two for each shared buffer */
338 numLocks += 2 * NBuffers;
340 /* proc.c needs one for each backend or auxiliary process */
341 numLocks += MaxBackends + NUM_AUXILIARY_PROCS;
343 /* clog.c needs one per CLOG buffer */
344 numLocks += CLOGShmemBuffers();
346 /* commit_ts.c needs one per CommitTs buffer */
347 numLocks += CommitTsShmemBuffers();
349 /* subtrans.c needs one per SubTrans buffer */
350 numLocks += NUM_SUBTRANS_BUFFERS;
352 /* multixact.c needs two SLRU areas */
353 numLocks += NUM_MXACTOFFSET_BUFFERS + NUM_MXACTMEMBER_BUFFERS;
355 /* async.c needs one per Async buffer */
356 numLocks += NUM_ASYNC_BUFFERS;
358 /* predicate.c needs one per old serializable xid buffer */
359 numLocks += NUM_OLDSERXID_BUFFERS;
361 /* slot.c needs one for each slot */
362 numLocks += max_replication_slots;
365 * Add any requested by loadable modules; for backwards-compatibility
366 * reasons, allocate at least NUM_USER_DEFINED_LWLOCKS of them even if
367 * there are no explicit requests.
369 lock_addin_request_allowed = false;
370 numLocks += Max(lock_addin_request, NUM_USER_DEFINED_LWLOCKS);
377 * RequestAddinLWLocks
378 * Request that extra LWLocks be allocated for use by
381 * This is only useful if called from the _PG_init hook of a library that
382 * is loaded into the postmaster via shared_preload_libraries. Once
383 * shared memory has been allocated, calls will be ignored. (We could
384 * raise an error, but it seems better to make it a no-op, so that
385 * libraries containing such calls can be reloaded if needed.)
388 RequestAddinLWLocks(int n)
390 if (IsUnderPostmaster || !lock_addin_request_allowed)
391 return; /* too late */
392 lock_addin_request += n;
397 * Compute shmem space needed for LWLocks.
400 LWLockShmemSize(void)
403 int numLocks = NumLWLocks();
405 /* Space for the LWLock array. */
406 size = mul_size(numLocks, sizeof(LWLockPadded));
408 /* Space for dynamic allocation counter, plus room for alignment. */
409 size = add_size(size, 3 * sizeof(int) + LWLOCK_PADDED_SIZE);
416 * Allocate shmem space for the main LWLock array and initialize it. We also
417 * register the main tranch here.
422 StaticAssertExpr(LW_VAL_EXCLUSIVE > (uint32) MAX_BACKENDS,
423 "MAX_BACKENDS too big for lwlock.c");
425 if (!IsUnderPostmaster)
427 int numLocks = NumLWLocks();
428 Size spaceLocks = LWLockShmemSize();
435 ptr = (char *) ShmemAlloc(spaceLocks);
437 /* Leave room for dynamic allocation of locks and tranches */
438 ptr += 3 * sizeof(int);
440 /* Ensure desired alignment of LWLock array */
441 ptr += LWLOCK_PADDED_SIZE - ((uintptr_t) ptr) % LWLOCK_PADDED_SIZE;
443 MainLWLockArray = (LWLockPadded *) ptr;
445 /* Initialize all LWLocks in main array */
446 for (id = 0, lock = MainLWLockArray; id < numLocks; id++, lock++)
447 LWLockInitialize(&lock->lock, 0);
450 * Initialize the dynamic-allocation counters, which are stored just
451 * before the first LWLock. LWLockCounter[0] is the allocation
452 * counter for lwlocks, LWLockCounter[1] is the maximum number that
453 * can be allocated from the main array, and LWLockCounter[2] is the
454 * allocation counter for tranches.
456 LWLockCounter = (int *) ((char *) MainLWLockArray - 3 * sizeof(int));
457 LWLockCounter[0] = NUM_FIXED_LWLOCKS;
458 LWLockCounter[1] = numLocks;
459 LWLockCounter[2] = 1; /* 0 is the main array */
462 if (LWLockTrancheArray == NULL)
464 LWLockTranchesAllocated = 16;
465 LWLockTrancheArray = (LWLockTranche **)
466 MemoryContextAlloc(TopMemoryContext,
467 LWLockTranchesAllocated * sizeof(LWLockTranche *));
470 MainLWLockTranche.name = "main";
471 MainLWLockTranche.array_base = MainLWLockArray;
472 MainLWLockTranche.array_stride = sizeof(LWLockPadded);
473 LWLockRegisterTranche(0, &MainLWLockTranche);
477 * InitLWLockAccess - initialize backend-local state needed to hold LWLocks
480 InitLWLockAccess(void)
488 * LWLockAssign - assign a dynamically-allocated LWLock number
490 * We interlock this using the same spinlock that is used to protect
491 * ShmemAlloc(). Interlocking is not really necessary during postmaster
492 * startup, but it is needed if any user-defined code tries to allocate
493 * LWLocks after startup.
501 LWLockCounter = (int *) ((char *) MainLWLockArray - 3 * sizeof(int));
502 SpinLockAcquire(ShmemLock);
503 if (LWLockCounter[0] >= LWLockCounter[1])
505 SpinLockRelease(ShmemLock);
506 elog(ERROR, "no more LWLocks available");
508 result = &MainLWLockArray[LWLockCounter[0]++].lock;
509 SpinLockRelease(ShmemLock);
514 * Allocate a new tranche ID.
517 LWLockNewTrancheId(void)
522 LWLockCounter = (int *) ((char *) MainLWLockArray - 3 * sizeof(int));
523 SpinLockAcquire(ShmemLock);
524 result = LWLockCounter[2]++;
525 SpinLockRelease(ShmemLock);
531 * Register a tranche ID in the lookup table for the current process. This
532 * routine will save a pointer to the tranche object passed as an argument,
533 * so that object should be allocated in a backend-lifetime context
534 * (TopMemoryContext, static variable, or similar).
537 LWLockRegisterTranche(int tranche_id, LWLockTranche *tranche)
539 Assert(LWLockTrancheArray != NULL);
541 if (tranche_id >= LWLockTranchesAllocated)
543 int i = LWLockTranchesAllocated;
545 while (i <= tranche_id)
548 LWLockTrancheArray = (LWLockTranche **)
549 repalloc(LWLockTrancheArray,
550 i * sizeof(LWLockTranche *));
551 LWLockTranchesAllocated = i;
554 LWLockTrancheArray[tranche_id] = tranche;
558 * LWLockInitialize - initialize a new lwlock; it's initially unlocked
561 LWLockInitialize(LWLock *lock, int tranche_id)
563 SpinLockInit(&lock->mutex);
564 pg_atomic_init_u32(&lock->state, LW_FLAG_RELEASE_OK);
566 pg_atomic_init_u32(&lock->nwaiters, 0);
568 lock->tranche = tranche_id;
569 dlist_init(&lock->waiters);
573 * Internal function that tries to atomically acquire the lwlock in the passed
576 * This function will not block waiting for a lock to become free - that's the
579 * Returns true if the lock isn't free and we need to wait.
582 LWLockAttemptLock(LWLock* lock, LWLockMode mode)
584 AssertArg(mode == LW_EXCLUSIVE || mode == LW_SHARED);
586 /* loop until we've determined whether we could acquire the lock or not */
590 uint32 expected_state;
591 uint32 desired_state;
594 old_state = pg_atomic_read_u32(&lock->state);
595 expected_state = old_state;
596 desired_state = expected_state;
598 if (mode == LW_EXCLUSIVE)
600 lock_free = (expected_state & LW_LOCK_MASK) == 0;
602 desired_state += LW_VAL_EXCLUSIVE;
606 lock_free = (expected_state & LW_VAL_EXCLUSIVE) == 0;
608 desired_state += LW_VAL_SHARED;
612 * Attempt to swap in the state we are expecting. If we didn't see
613 * lock to be free, that's just the old value. If we saw it as free,
614 * we'll attempt to mark it acquired. The reason that we always swap
615 * in the value is that this doubles as a memory barrier. We could try
616 * to be smarter and only swap in values if we saw the lock as free,
617 * but benchmark haven't shown it as beneficial so far.
619 * Retry if the value changed since we last looked at it.
621 if (pg_atomic_compare_exchange_u32(&lock->state,
622 &expected_state, desired_state))
626 /* Great! Got the lock. */
628 if (mode == LW_EXCLUSIVE)
629 lock->owner = MyProc;
634 return true; /* someobdy else has the lock */
641 * Wakeup all the lockers that currently have a chance to acquire the lock.
644 LWLockWakeup(LWLock *lock)
647 bool wokeup_somebody = false;
649 dlist_mutable_iter iter;
651 lwlock_stats *lwstats;
653 lwstats = get_lwlock_stats_entry(lock);
658 new_release_ok = true;
660 /* Acquire mutex. Time spent holding mutex should be short! */
662 lwstats->spin_delay_count += SpinLockAcquire(&lock->mutex);
664 SpinLockAcquire(&lock->mutex);
667 dlist_foreach_modify(iter, &lock->waiters)
669 PGPROC *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur);
671 if (wokeup_somebody && waiter->lwWaitMode == LW_EXCLUSIVE)
674 dlist_delete(&waiter->lwWaitLink);
675 dlist_push_tail(&wakeup, &waiter->lwWaitLink);
677 if (waiter->lwWaitMode != LW_WAIT_UNTIL_FREE)
680 * Prevent additional wakeups until retryer gets to run. Backends
681 * that are just waiting for the lock to become free don't retry
684 new_release_ok = false;
686 * Don't wakeup (further) exclusive locks.
688 wokeup_somebody = true;
692 * Once we've woken up an exclusive lock, there's no point in waking
695 if(waiter->lwWaitMode == LW_EXCLUSIVE)
699 Assert(dlist_is_empty(&wakeup) || pg_atomic_read_u32(&lock->state) & LW_FLAG_HAS_WAITERS);
701 /* Unset both flags at once if required */
702 if (!new_release_ok && dlist_is_empty(&wakeup))
703 pg_atomic_fetch_and_u32(&lock->state,
704 ~(LW_FLAG_RELEASE_OK | LW_FLAG_HAS_WAITERS));
705 else if (!new_release_ok)
706 pg_atomic_fetch_and_u32(&lock->state, ~LW_FLAG_RELEASE_OK);
707 else if (dlist_is_empty(&wakeup))
708 pg_atomic_fetch_and_u32(&lock->state, ~LW_FLAG_HAS_WAITERS);
709 else if (new_release_ok)
710 pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK);
712 /* We are done updating the shared state of the lock queue. */
713 SpinLockRelease(&lock->mutex);
715 /* Awaken any waiters I removed from the queue. */
716 dlist_foreach_modify(iter, &wakeup)
718 PGPROC *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur);
720 LOG_LWDEBUG("LWLockRelease", lock, "release waiter");
721 dlist_delete(&waiter->lwWaitLink);
723 * Guarantee that lwWaiting being unset only becomes visible once the
724 * unlink from the link has completed. Otherwise the target backend
725 * could be woken up for other reason and enqueue for a new lock - if
726 * that happens before the list unlink happens, the list would end up
729 * The barrier pairs with the SpinLockAcquire() when enqueing for
733 waiter->lwWaiting = false;
734 PGSemaphoreUnlock(&waiter->sem);
739 * Add ourselves to the end of the queue.
741 * NB: Mode can be LW_WAIT_UNTIL_FREE here!
744 LWLockQueueSelf(LWLock *lock, LWLockMode mode)
747 lwlock_stats *lwstats;
749 lwstats = get_lwlock_stats_entry(lock);
753 * If we don't have a PGPROC structure, there's no way to wait. This
754 * should never occur, since MyProc should only be null during shared
755 * memory initialization.
758 elog(PANIC, "cannot wait without a PGPROC structure");
760 if (MyProc->lwWaiting)
761 elog(PANIC, "queueing for lock while waiting on another one");
764 lwstats->spin_delay_count += SpinLockAcquire(&lock->mutex);
766 SpinLockAcquire(&lock->mutex);
769 /* setting the flag is protected by the spinlock */
770 pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_HAS_WAITERS);
772 MyProc->lwWaiting = true;
773 MyProc->lwWaitMode = mode;
775 /* LW_WAIT_UNTIL_FREE waiters are always at the front of the queue */
776 if (mode == LW_WAIT_UNTIL_FREE)
777 dlist_push_head(&lock->waiters, &MyProc->lwWaitLink);
779 dlist_push_tail(&lock->waiters, &MyProc->lwWaitLink);
781 /* Can release the mutex now */
782 SpinLockRelease(&lock->mutex);
785 pg_atomic_fetch_add_u32(&lock->nwaiters, 1);
791 * Remove ourselves from the waitlist.
793 * This is used if we queued ourselves because we thought we needed to sleep
794 * but, after further checking, we discovered that we don't actually need to
795 * do so. Returns false if somebody else already has woken us up, otherwise
799 LWLockDequeueSelf(LWLock *lock)
802 dlist_mutable_iter iter;
805 lwlock_stats *lwstats;
807 lwstats = get_lwlock_stats_entry(lock);
809 lwstats->dequeue_self_count++;
813 lwstats->spin_delay_count += SpinLockAcquire(&lock->mutex);
815 SpinLockAcquire(&lock->mutex);
819 * Can't just remove ourselves from the list, but we need to iterate over
820 * all entries as somebody else could have unqueued us.
822 dlist_foreach_modify(iter, &lock->waiters)
824 PGPROC *proc = dlist_container(PGPROC, lwWaitLink, iter.cur);
828 dlist_delete(&proc->lwWaitLink);
833 if (dlist_is_empty(&lock->waiters) &&
834 (pg_atomic_read_u32(&lock->state) & LW_FLAG_HAS_WAITERS) != 0)
836 pg_atomic_fetch_and_u32(&lock->state, ~LW_FLAG_HAS_WAITERS);
839 SpinLockRelease(&lock->mutex);
841 /* clear waiting state again, nice for debugging */
843 MyProc->lwWaiting = false;
849 * Somebody else dequeued us and has or will wake us up. Deal with the
850 * superflous absorption of a wakeup.
854 * Reset releaseOk if somebody woke us before we removed ourselves -
855 * they'll have set it to false.
857 pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK);
860 * Now wait for the scheduled wakeup, otherwise our ->lwWaiting would
861 * get reset at some inconvenient point later. Most of the time this
862 * will immediately return.
866 /* "false" means cannot accept cancel/die interrupt here. */
867 PGSemaphoreLock(&MyProc->sem, false);
868 if (!MyProc->lwWaiting)
874 * Fix the process wait semaphore's count for any absorbed wakeups.
876 while (extraWaits-- > 0)
877 PGSemaphoreUnlock(&MyProc->sem);
882 /* not waiting anymore */
883 uint32 nwaiters = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
884 Assert(nwaiters < MAX_BACKENDS);
890 * LWLockAcquire - acquire a lightweight lock in the specified mode
892 * If the lock is not available, sleep until it is. Returns true if the lock
893 * was available immediately, false if we had to sleep.
895 * Side effect: cancel/die interrupts are held off until lock release.
898 LWLockAcquire(LWLock *l, LWLockMode mode)
900 return LWLockAcquireCommon(l, mode, NULL, 0);
904 * LWLockAcquireWithVar - like LWLockAcquire, but also sets *valptr = val
906 * The lock is always acquired in exclusive mode with this function.
909 LWLockAcquireWithVar(LWLock *l, uint64 *valptr, uint64 val)
911 return LWLockAcquireCommon(l, LW_EXCLUSIVE, valptr, val);
914 /* internal function to implement LWLockAcquire and LWLockAcquireWithVar */
916 LWLockAcquireCommon(LWLock *lock, LWLockMode mode, uint64 *valptr, uint64 val)
918 PGPROC *proc = MyProc;
922 lwlock_stats *lwstats;
924 lwstats = get_lwlock_stats_entry(lock);
927 AssertArg(mode == LW_SHARED || mode == LW_EXCLUSIVE);
929 PRINT_LWDEBUG("LWLockAcquire", lock, mode);
932 /* Count lock acquisition attempts */
933 if (mode == LW_EXCLUSIVE)
934 lwstats->ex_acquire_count++;
936 lwstats->sh_acquire_count++;
937 #endif /* LWLOCK_STATS */
940 * We can't wait if we haven't got a PGPROC. This should only occur
941 * during bootstrap or shared memory initialization. Put an Assert here
942 * to catch unsafe coding practices.
944 Assert(!(proc == NULL && IsUnderPostmaster));
946 /* Ensure we will have room to remember the lock */
947 if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
948 elog(ERROR, "too many LWLocks taken");
951 * Lock out cancel/die interrupts until we exit the code section protected
952 * by the LWLock. This ensures that interrupts will not interfere with
953 * manipulations of data structures in shared memory.
958 * Loop here to try to acquire lock after each time we are signaled by
961 * NOTE: it might seem better to have LWLockRelease actually grant us the
962 * lock, rather than retrying and possibly having to go back to sleep. But
963 * in practice that is no good because it means a process swap for every
964 * lock acquisition when two or more processes are contending for the same
965 * lock. Since LWLocks are normally used to protect not-very-long
966 * sections of computation, a process needs to be able to acquire and
967 * release the same lock many times during a single CPU time slice, even
968 * in the presence of contention. The efficiency of being able to do that
969 * outweighs the inefficiency of sometimes wasting a process dispatch
970 * cycle because the lock is not free when a released waiter finally gets
971 * to run. See pgsql-hackers archives for 29-Dec-01.
978 * Try to grab the lock the first time, we're not in the waitqueue
981 mustwait = LWLockAttemptLock(lock, mode);
985 /* XXX: remove before commit? */
986 LOG_LWDEBUG("LWLockAcquire", lock, "immediately acquired lock");
987 break; /* got the lock */
991 * Ok, at this point we couldn't grab the lock on the first try. We
992 * cannot simply queue ourselves to the end of the list and wait to be
993 * woken up because by now the lock could long have been released.
994 * Instead add us to the queue and try to grab the lock again. If we
995 * succeed we need to revert the queuing and be happy, otherwise we
996 * recheck the lock. If we still couldn't grab it, we know that the
997 * other lock will see our queue entries when releasing since they
998 * existed before we checked for the lock.
1001 /* add to the queue */
1002 LWLockQueueSelf(lock, mode);
1004 /* we're now guaranteed to be woken up if necessary */
1005 mustwait = LWLockAttemptLock(lock, mode);
1007 /* ok, grabbed the lock the second time round, need to undo queueing */
1010 LOG_LWDEBUG("LWLockAcquire", lock, "acquired, undoing queue");
1012 LWLockDequeueSelf(lock);
1017 * Wait until awakened.
1019 * Since we share the process wait semaphore with the regular lock
1020 * manager and ProcWaitForSignal, and we may need to acquire an LWLock
1021 * while one of those is pending, it is possible that we get awakened
1022 * for a reason other than being signaled by LWLockRelease. If so,
1023 * loop back and wait again. Once we've gotten the LWLock,
1024 * re-increment the sema by the number of additional signals received,
1025 * so that the lock manager or signal manager will see the received
1026 * signal when it next waits.
1028 LOG_LWDEBUG("LWLockAcquire", lock, "waiting");
1031 lwstats->block_count++;
1034 TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), T_ID(lock), mode);
1038 /* "false" means cannot accept cancel/die interrupt here. */
1039 PGSemaphoreLock(&proc->sem, false);
1040 if (!proc->lwWaiting)
1045 /* Retrying, allow LWLockRelease to release waiters again. */
1046 pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK);
1050 /* not waiting anymore */
1051 uint32 nwaiters = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
1052 Assert(nwaiters < MAX_BACKENDS);
1056 TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), T_ID(lock), mode);
1058 LOG_LWDEBUG("LWLockAcquire", lock, "awakened");
1060 /* Now loop back and try to acquire lock again. */
1064 /* If there's a variable associated with this lock, initialize it */
1068 TRACE_POSTGRESQL_LWLOCK_ACQUIRE(T_NAME(lock), T_ID(lock), mode);
1070 /* Add lock to list of locks held by this backend */
1071 held_lwlocks[num_held_lwlocks].lock = lock;
1072 held_lwlocks[num_held_lwlocks++].mode = mode;
1075 * Fix the process wait semaphore's count for any absorbed wakeups.
1077 while (extraWaits-- > 0)
1078 PGSemaphoreUnlock(&proc->sem);
1084 * LWLockConditionalAcquire - acquire a lightweight lock in the specified mode
1086 * If the lock is not available, return FALSE with no side-effects.
1088 * If successful, cancel/die interrupts are held off until lock release.
1091 LWLockConditionalAcquire(LWLock *lock, LWLockMode mode)
1095 AssertArg(mode == LW_SHARED || mode == LW_EXCLUSIVE);
1097 PRINT_LWDEBUG("LWLockConditionalAcquire", lock, mode);
1099 /* Ensure we will have room to remember the lock */
1100 if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
1101 elog(ERROR, "too many LWLocks taken");
1104 * Lock out cancel/die interrupts until we exit the code section protected
1105 * by the LWLock. This ensures that interrupts will not interfere with
1106 * manipulations of data structures in shared memory.
1110 /* Check for the lock */
1111 mustwait = LWLockAttemptLock(lock, mode);
1115 /* Failed to get lock, so release interrupt holdoff */
1116 RESUME_INTERRUPTS();
1118 LOG_LWDEBUG("LWLockConditionalAcquire", lock, "failed");
1119 TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE_FAIL(T_NAME(lock), T_ID(lock), mode);
1123 /* Add lock to list of locks held by this backend */
1124 held_lwlocks[num_held_lwlocks].lock = lock;
1125 held_lwlocks[num_held_lwlocks++].mode = mode;
1126 TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE(T_NAME(lock), T_ID(lock), mode);
1132 * LWLockAcquireOrWait - Acquire lock, or wait until it's free
1134 * The semantics of this function are a bit funky. If the lock is currently
1135 * free, it is acquired in the given mode, and the function returns true. If
1136 * the lock isn't immediately free, the function waits until it is released
1137 * and returns false, but does not acquire the lock.
1139 * This is currently used for WALWriteLock: when a backend flushes the WAL,
1140 * holding WALWriteLock, it can flush the commit records of many other
1141 * backends as a side-effect. Those other backends need to wait until the
1142 * flush finishes, but don't need to acquire the lock anymore. They can just
1143 * wake up, observe that their records have already been flushed, and return.
1146 LWLockAcquireOrWait(LWLock *lock, LWLockMode mode)
1148 PGPROC *proc = MyProc;
1152 lwlock_stats *lwstats;
1154 lwstats = get_lwlock_stats_entry(lock);
1157 Assert(mode == LW_SHARED || mode == LW_EXCLUSIVE);
1159 PRINT_LWDEBUG("LWLockAcquireOrWait", lock, mode);
1161 /* Ensure we will have room to remember the lock */
1162 if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
1163 elog(ERROR, "too many LWLocks taken");
1166 * Lock out cancel/die interrupts until we exit the code section protected
1167 * by the LWLock. This ensures that interrupts will not interfere with
1168 * manipulations of data structures in shared memory.
1173 * NB: We're using nearly the same twice-in-a-row lock acquisition
1174 * protocol as LWLockAcquire(). Check its comments for details.
1176 mustwait = LWLockAttemptLock(lock, mode);
1180 LWLockQueueSelf(lock, LW_WAIT_UNTIL_FREE);
1182 mustwait = LWLockAttemptLock(lock, mode);
1187 * Wait until awakened. Like in LWLockAcquire, be prepared for bogus
1188 * wakups, because we share the semaphore with ProcWaitForSignal.
1190 LOG_LWDEBUG("LWLockAcquireOrWait", lock, "waiting");
1193 lwstats->block_count++;
1195 TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), T_ID(lock), mode);
1199 /* "false" means cannot accept cancel/die interrupt here. */
1200 PGSemaphoreLock(&proc->sem, false);
1201 if (!proc->lwWaiting)
1208 /* not waiting anymore */
1209 uint32 nwaiters = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
1210 Assert(nwaiters < MAX_BACKENDS);
1213 TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), T_ID(lock), mode);
1215 LOG_LWDEBUG("LWLockAcquireOrWait", lock, "awakened");
1219 LOG_LWDEBUG("LWLockAcquireOrWait", lock, "acquired, undoing queue");
1222 * Got lock in the second attempt, undo queueing. We need to
1223 * treat this as having successfully acquired the lock, otherwise
1224 * we'd not necessarily wake up people we've prevented from
1225 * acquiring the lock.
1227 LWLockDequeueSelf(lock);
1232 * Fix the process wait semaphore's count for any absorbed wakeups.
1234 while (extraWaits-- > 0)
1235 PGSemaphoreUnlock(&proc->sem);
1239 /* Failed to get lock, so release interrupt holdoff */
1240 RESUME_INTERRUPTS();
1241 LOG_LWDEBUG("LWLockAcquireOrWait", lock, "failed");
1242 TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT_FAIL(T_NAME(lock), T_ID(lock),
1247 LOG_LWDEBUG("LWLockAcquireOrWait", lock, "succeeded");
1248 /* Add lock to list of locks held by this backend */
1249 held_lwlocks[num_held_lwlocks].lock = lock;
1250 held_lwlocks[num_held_lwlocks++].mode = mode;
1251 TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT(T_NAME(lock), T_ID(lock), mode);
1258 * LWLockWaitForVar - Wait until lock is free, or a variable is updated.
1260 * If the lock is held and *valptr equals oldval, waits until the lock is
1261 * either freed, or the lock holder updates *valptr by calling
1262 * LWLockUpdateVar. If the lock is free on exit (immediately or after
1263 * waiting), returns true. If the lock is still held, but *valptr no longer
1264 * matches oldval, returns false and sets *newval to the current value in
1267 * It's possible that the lock holder releases the lock, but another backend
1268 * acquires it again before we get a chance to observe that the lock was
1269 * momentarily released. We wouldn't need to wait for the new lock holder,
1270 * but we cannot distinguish that case, so we will have to wait.
1272 * Note: this function ignores shared lock holders; if the lock is held
1273 * in shared mode, returns 'true'.
1276 LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval)
1278 PGPROC *proc = MyProc;
1280 bool result = false;
1282 lwlock_stats *lwstats;
1284 lwstats = get_lwlock_stats_entry(lock);
1287 PRINT_LWDEBUG("LWLockWaitForVar", lock, LW_WAIT_UNTIL_FREE);
1290 * Quick test first to see if it the slot is free right now.
1292 * XXX: the caller uses a spinlock before this, so we don't need a memory
1293 * barrier here as far as the current usage is concerned. But that might
1294 * not be safe in general.
1296 if ((pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE) == 0)
1300 * Lock out cancel/die interrupts while we sleep on the lock. There is no
1301 * cleanup mechanism to remove us from the wait queue if we got
1307 * Loop here to check the lock's status after each time we are signaled.
1314 mustwait = (pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE) != 0;
1319 * Perform comparison using spinlock as we can't rely on atomic 64
1323 lwstats->spin_delay_count += SpinLockAcquire(&lock->mutex);
1325 SpinLockAcquire(&lock->mutex);
1329 * XXX: We can significantly optimize this on platforms with 64bit
1333 if (value != oldval)
1341 SpinLockRelease(&lock->mutex);
1347 break; /* the lock was free or value didn't match */
1350 * Add myself to wait queue. Note that this is racy, somebody else
1351 * could wakeup before we're finished queuing.
1352 * NB: We're using nearly the same twice-in-a-row lock acquisition
1353 * protocol as LWLockAcquire(). Check its comments for details.
1355 LWLockQueueSelf(lock, LW_WAIT_UNTIL_FREE);
1358 * Set RELEASE_OK flag, to make sure we get woken up as soon as the
1361 pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK);
1364 * We're now guaranteed to be woken up if necessary. Recheck the
1367 mustwait = (pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE) != 0;
1369 /* Ok, lock is free after we queued ourselves. Undo queueing. */
1372 LOG_LWDEBUG("LWLockWaitForVar", lock, "free, undoing queue");
1374 LWLockDequeueSelf(lock);
1379 * Wait until awakened.
1381 * Since we share the process wait semaphore with the regular lock
1382 * manager and ProcWaitForSignal, and we may need to acquire an LWLock
1383 * while one of those is pending, it is possible that we get awakened
1384 * for a reason other than being signaled by LWLockRelease. If so,
1385 * loop back and wait again. Once we've gotten the LWLock,
1386 * re-increment the sema by the number of additional signals received,
1387 * so that the lock manager or signal manager will see the received
1388 * signal when it next waits.
1390 LOG_LWDEBUG("LWLockWaitForVar", lock, "waiting");
1393 lwstats->block_count++;
1396 TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), T_ID(lock),
1401 /* "false" means cannot accept cancel/die interrupt here. */
1402 PGSemaphoreLock(&proc->sem, false);
1403 if (!proc->lwWaiting)
1410 /* not waiting anymore */
1411 uint32 nwaiters = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
1412 Assert(nwaiters < MAX_BACKENDS);
1416 TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), T_ID(lock),
1419 LOG_LWDEBUG("LWLockWaitForVar", lock, "awakened");
1421 /* Now loop back and check the status of the lock again. */
1424 TRACE_POSTGRESQL_LWLOCK_ACQUIRE(T_NAME(lock), T_ID(lock), LW_EXCLUSIVE);
1427 * Fix the process wait semaphore's count for any absorbed wakeups.
1429 while (extraWaits-- > 0)
1430 PGSemaphoreUnlock(&proc->sem);
1433 * Now okay to allow cancel/die interrupts.
1435 RESUME_INTERRUPTS();
1442 * LWLockUpdateVar - Update a variable and wake up waiters atomically
1444 * Sets *valptr to 'val', and wakes up all processes waiting for us with
1445 * LWLockWaitForVar(). Setting the value and waking up the processes happen
1446 * atomically so that any process calling LWLockWaitForVar() on the same lock
1447 * is guaranteed to see the new value, and act accordingly.
1449 * The caller must be holding the lock in exclusive mode.
1452 LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 val)
1455 dlist_mutable_iter iter;
1457 lwlock_stats *lwstats;
1459 lwstats = get_lwlock_stats_entry(lock);
1462 PRINT_LWDEBUG("LWLockUpdateVar", lock, LW_EXCLUSIVE);
1464 dlist_init(&wakeup);
1466 /* Acquire mutex. Time spent holding mutex should be short! */
1468 lwstats->spin_delay_count += SpinLockAcquire(&lock->mutex);
1470 SpinLockAcquire(&lock->mutex);
1473 Assert(pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE);
1475 /* Update the lock's value */
1479 * See if there are any LW_WAIT_UNTIL_FREE waiters that need to be woken
1480 * up. They are always in the front of the queue.
1482 dlist_foreach_modify(iter, &lock->waiters)
1484 PGPROC *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur);
1486 if (waiter->lwWaitMode != LW_WAIT_UNTIL_FREE)
1489 dlist_delete(&waiter->lwWaitLink);
1490 dlist_push_tail(&wakeup, &waiter->lwWaitLink);
1493 /* We are done updating shared state of the lock itself. */
1494 SpinLockRelease(&lock->mutex);
1497 * Awaken any waiters I removed from the queue.
1499 dlist_foreach_modify(iter, &wakeup)
1501 PGPROC *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur);
1502 dlist_delete(&waiter->lwWaitLink);
1503 /* check comment in LWLockWakeup() about this barrier */
1505 waiter->lwWaiting = false;
1506 PGSemaphoreUnlock(&waiter->sem);
1512 * LWLockRelease - release a previously acquired lock
1515 LWLockRelease(LWLock *lock)
1523 * Remove lock from list of locks held. Usually, but not always, it will
1524 * be the latest-acquired lock; so search array backwards.
1526 for (i = num_held_lwlocks; --i >= 0;)
1528 if (lock == held_lwlocks[i].lock)
1530 mode = held_lwlocks[i].mode;
1535 elog(ERROR, "lock %s %d is not held", T_NAME(lock), T_ID(lock));
1537 for (; i < num_held_lwlocks; i++)
1538 held_lwlocks[i] = held_lwlocks[i + 1];
1540 PRINT_LWDEBUG("LWLockRelease", lock, mode);
1543 * Release my hold on lock, after that it can immediately be acquired by
1544 * others, even if we still have to wakeup other waiters.
1546 if (mode == LW_EXCLUSIVE)
1547 oldstate = pg_atomic_sub_fetch_u32(&lock->state, LW_VAL_EXCLUSIVE);
1549 oldstate = pg_atomic_sub_fetch_u32(&lock->state, LW_VAL_SHARED);
1551 /* nobody else can have that kind of lock */
1552 Assert(!(oldstate & LW_VAL_EXCLUSIVE));
1556 * We're still waiting for backends to get scheduled, don't wake them up
1559 if ((oldstate & (LW_FLAG_HAS_WAITERS | LW_FLAG_RELEASE_OK)) ==
1560 (LW_FLAG_HAS_WAITERS | LW_FLAG_RELEASE_OK) &&
1561 (oldstate & LW_LOCK_MASK) == 0)
1562 check_waiters = true;
1564 check_waiters = false;
1567 * As waking up waiters requires the spinlock to be acquired, only do so
1572 /* XXX: remove before commit? */
1573 LOG_LWDEBUG("LWLockRelease", lock, "releasing waiters");
1577 TRACE_POSTGRESQL_LWLOCK_RELEASE(T_NAME(lock), T_ID(lock));
1580 * Now okay to allow cancel/die interrupts.
1582 RESUME_INTERRUPTS();
1587 * LWLockReleaseAll - release all currently-held locks
1589 * Used to clean up after ereport(ERROR). An important difference between this
1590 * function and retail LWLockRelease calls is that InterruptHoldoffCount is
1591 * unchanged by this operation. This is necessary since InterruptHoldoffCount
1592 * has been set to an appropriate level earlier in error recovery. We could
1593 * decrement it below zero if we allow it to drop for each released lock!
1596 LWLockReleaseAll(void)
1598 while (num_held_lwlocks > 0)
1600 HOLD_INTERRUPTS(); /* match the upcoming RESUME_INTERRUPTS */
1602 LWLockRelease(held_lwlocks[num_held_lwlocks - 1].lock);
1608 * LWLockHeldByMe - test whether my process currently holds a lock
1610 * This is meant as debug support only. We currently do not distinguish
1611 * whether the lock is held shared or exclusive.
1614 LWLockHeldByMe(LWLock *l)
1618 for (i = 0; i < num_held_lwlocks; i++)
1620 if (held_lwlocks[i].lock == l)