]> granicus.if.org Git - postgresql/blob - src/backend/storage/lmgr/lwlock.c
Update copyright for 2015
[postgresql] / src / backend / storage / lmgr / lwlock.c
1 /*-------------------------------------------------------------------------
2  *
3  * lwlock.c
4  *        Lightweight lock manager
5  *
6  * Lightweight locks are intended primarily to provide mutual exclusion of
7  * access to shared-memory data structures.  Therefore, they offer both
8  * exclusive and shared lock modes (to support read/write and read-only
9  * access to a shared object).  There are few other frammishes.  User-level
10  * locking should be done with the full lock manager --- which depends on
11  * LWLocks to protect its shared state.
12  *
13  * In addition to exclusive and shared modes, lightweight locks can be used
14  * to wait until a variable changes value.  The variable is initially set
15  * when the lock is acquired with LWLockAcquireWithVar, and can be updated
16  * without releasing the lock by calling LWLockUpdateVar.  LWLockWaitForVar
17  * waits for the variable to be updated, or until the lock is free.  The
18  * meaning of the variable is up to the caller, the lightweight lock code
19  * just assigns and compares it.
20  *
21  * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
22  * Portions Copyright (c) 1994, Regents of the University of California
23  *
24  * IDENTIFICATION
25  *        src/backend/storage/lmgr/lwlock.c
26  *
27  * NOTES:
28  *
29  * This used to be a pretty straight forward reader-writer lock
30  * implementation, in which the internal state was protected by a
31  * spinlock. Unfortunately the overhead of taking the spinlock proved to be
32  * too high for workloads/locks that were taken in shared mode very
33  * frequently. Often we were spinning in the (obviously exclusive) spinlock,
34  * while trying to acquire a shared lock that was actually free.
35  *
36  * Thus a new implementation was devised that provides wait-free shared lock
37  * acquisition for locks that aren't exclusively locked.
38  *
39  * The basic idea is to have a single atomic variable 'lockcount' instead of
40  * the formerly separate shared and exclusive counters and to use atomic
41  * operations to acquire the lock. That's fairly easy to do for plain
42  * rw-spinlocks, but a lot harder for something like LWLocks that want to wait
43  * in the OS.
44  *
45  * For lock acquisition we use an atomic compare-and-exchange on the lockcount
46  * variable. For exclusive lock we swap in a sentinel value
47  * (LW_VAL_EXCLUSIVE), for shared locks we count the number of holders.
48  *
49  * To release the lock we use an atomic decrement to release the lock. If the
50  * new value is zero (we get that atomically), we know we can/have to release
51  * waiters.
52  *
53  * Obviously it is important that the sentinel value for exclusive locks
54  * doesn't conflict with the maximum number of possible share lockers -
55  * luckily MAX_BACKENDS makes that easily possible.
56  *
57  *
58  * The attentive reader might have noticed that naively doing the above has a
59  * glaring race condition: We try to lock using the atomic operations and
60  * notice that we have to wait. Unfortunately by the time we have finished
61  * queuing, the former locker very well might have already finished it's
62  * work. That's problematic because we're now stuck waiting inside the OS.
63
64  * To mitigate those races we use a two phased attempt at locking:
65  *   Phase 1: Try to do it atomically, if we succeed, nice
66  *   Phase 2: Add ourselves to the waitqueue of the lock
67  *   Phase 3: Try to grab the lock again, if we succeed, remove ourselves from
68  *            the queue
69  *   Phase 4: Sleep till wake-up, goto Phase 1
70  *
71  * This protects us against the problem from above as nobody can release too
72  *    quick, before we're queued, since after Phase 2 we're already queued.
73  * -------------------------------------------------------------------------
74  */
75 #include "postgres.h"
76
77 #include "access/clog.h"
78 #include "access/commit_ts.h"
79 #include "access/multixact.h"
80 #include "access/subtrans.h"
81 #include "commands/async.h"
82 #include "miscadmin.h"
83 #include "pg_trace.h"
84 #include "postmaster/postmaster.h"
85 #include "replication/slot.h"
86 #include "storage/ipc.h"
87 #include "storage/predicate.h"
88 #include "storage/proc.h"
89 #include "storage/spin.h"
90 #include "utils/memutils.h"
91
92 #ifdef LWLOCK_STATS
93 #include "utils/hsearch.h"
94 #endif
95
96
97 /* We use the ShmemLock spinlock to protect LWLockAssign */
98 extern slock_t *ShmemLock;
99
100 #define LW_FLAG_HAS_WAITERS                     ((uint32) 1 << 30)
101 #define LW_FLAG_RELEASE_OK                      ((uint32) 1 << 29)
102
103 #define LW_VAL_EXCLUSIVE                        ((uint32) 1 << 24)
104 #define LW_VAL_SHARED                           1
105
106 #define LW_LOCK_MASK                            ((uint32) ((1 << 25)-1))
107 /* Must be greater than MAX_BACKENDS - which is 2^23-1, so we're fine. */
108 #define LW_SHARED_MASK                          ((uint32)(1 << 23))
109
110 /*
111  * This is indexed by tranche ID and stores metadata for all tranches known
112  * to the current backend.
113  */
114 static LWLockTranche **LWLockTrancheArray = NULL;
115 static int      LWLockTranchesAllocated = 0;
116
117 #define T_NAME(lock) \
118         (LWLockTrancheArray[(lock)->tranche]->name)
119 #define T_ID(lock) \
120         ((int) ((((char *) lock) - \
121                 ((char *) LWLockTrancheArray[(lock)->tranche]->array_base)) / \
122                 LWLockTrancheArray[(lock)->tranche]->array_stride))
123
124 /*
125  * This points to the main array of LWLocks in shared memory.  Backends inherit
126  * the pointer by fork from the postmaster (except in the EXEC_BACKEND case,
127  * where we have special measures to pass it down).
128  */
129 LWLockPadded *MainLWLockArray = NULL;
130 static LWLockTranche MainLWLockTranche;
131
132 /*
133  * We use this structure to keep track of locked LWLocks for release
134  * during error recovery.  Normally, only a few will be held at once, but
135  * occasionally the number can be much higher; for example, the pg_buffercache
136  * extension locks all buffer partitions simultaneously.
137  */
138 #define MAX_SIMUL_LWLOCKS       200
139
140 /* struct representing the LWLocks we're holding */
141 typedef struct LWLockHandle
142 {
143         LWLock *lock;
144         LWLockMode      mode;
145 } LWLockHandle;
146
147 static int      num_held_lwlocks = 0;
148 static LWLockHandle held_lwlocks[MAX_SIMUL_LWLOCKS];
149
150 static int      lock_addin_request = 0;
151 static bool lock_addin_request_allowed = true;
152
153 static inline bool LWLockAcquireCommon(LWLock *l, LWLockMode mode,
154                                         uint64 *valptr, uint64 val);
155
156 #ifdef LWLOCK_STATS
157 typedef struct lwlock_stats_key
158 {
159         int                     tranche;
160         int                     instance;
161 }       lwlock_stats_key;
162
163 typedef struct lwlock_stats
164 {
165         lwlock_stats_key key;
166         int                     sh_acquire_count;
167         int                     ex_acquire_count;
168         int                     block_count;
169         int                     dequeue_self_count;
170         int                     spin_delay_count;
171 }       lwlock_stats;
172
173 static HTAB *lwlock_stats_htab;
174 static lwlock_stats lwlock_stats_dummy;
175 #endif
176
177 #ifdef LOCK_DEBUG
178 bool            Trace_lwlocks = false;
179
180 inline static void
181 PRINT_LWDEBUG(const char *where, LWLock *lock, LWLockMode mode)
182 {
183         /* hide statement & context here, otherwise the log is just too verbose */
184         if (Trace_lwlocks)
185         {
186                 uint32 state = pg_atomic_read_u32(&lock->state);
187                 ereport(LOG,
188                                 (errhidestmt(true),
189                                  errhidecontext(true),
190                                  errmsg("%d: %s(%s %d): excl %u shared %u haswaiters %u waiters %u rOK %d",
191                                                 MyProcPid,
192                                                 where, T_NAME(lock), T_ID(lock),
193                                                 !!(state & LW_VAL_EXCLUSIVE),
194                                                 state & LW_SHARED_MASK,
195                                                 !!(state & LW_FLAG_HAS_WAITERS),
196                                                 pg_atomic_read_u32(&lock->nwaiters),
197                                                 !!(state & LW_FLAG_RELEASE_OK))));
198         }
199 }
200
201 inline static void
202 LOG_LWDEBUG(const char *where, LWLock *lock, const char *msg)
203 {
204         /* hide statement & context here, otherwise the log is just too verbose */
205         if (Trace_lwlocks)
206         {
207                 ereport(LOG,
208                                 (errhidestmt(true),
209                                  errhidecontext(true),
210                                  errmsg("%s(%s %d): %s", where, T_NAME(lock), T_ID(lock), msg)));
211         }
212 }
213
214 #else                                                   /* not LOCK_DEBUG */
215 #define PRINT_LWDEBUG(a,b,c) ((void)0)
216 #define LOG_LWDEBUG(a,b,c) ((void)0)
217 #endif   /* LOCK_DEBUG */
218
219 #ifdef LWLOCK_STATS
220
221 static void init_lwlock_stats(void);
222 static void print_lwlock_stats(int code, Datum arg);
223 static lwlock_stats *get_lwlock_stats_entry(LWLock *lockid);
224
225 static void
226 init_lwlock_stats(void)
227 {
228         HASHCTL         ctl;
229         static MemoryContext lwlock_stats_cxt = NULL;
230         static bool exit_registered = false;
231
232         if (lwlock_stats_cxt != NULL)
233                 MemoryContextDelete(lwlock_stats_cxt);
234
235         /*
236          * The LWLock stats will be updated within a critical section, which
237          * requires allocating new hash entries. Allocations within a critical
238          * section are normally not allowed because running out of memory would
239          * lead to a PANIC, but LWLOCK_STATS is debugging code that's not normally
240          * turned on in production, so that's an acceptable risk. The hash entries
241          * are small, so the risk of running out of memory is minimal in practice.
242          */
243         lwlock_stats_cxt = AllocSetContextCreate(TopMemoryContext,
244                                                                                          "LWLock stats",
245                                                                                          ALLOCSET_DEFAULT_MINSIZE,
246                                                                                          ALLOCSET_DEFAULT_INITSIZE,
247                                                                                          ALLOCSET_DEFAULT_MAXSIZE);
248         MemoryContextAllowInCriticalSection(lwlock_stats_cxt, true);
249
250         MemSet(&ctl, 0, sizeof(ctl));
251         ctl.keysize = sizeof(lwlock_stats_key);
252         ctl.entrysize = sizeof(lwlock_stats);
253         ctl.hcxt = lwlock_stats_cxt;
254         lwlock_stats_htab = hash_create("lwlock stats", 16384, &ctl,
255                                                                         HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
256         if (!exit_registered)
257         {
258                 on_shmem_exit(print_lwlock_stats, 0);
259                 exit_registered = true;
260         }
261 }
262
263 static void
264 print_lwlock_stats(int code, Datum arg)
265 {
266         HASH_SEQ_STATUS scan;
267         lwlock_stats *lwstats;
268
269         hash_seq_init(&scan, lwlock_stats_htab);
270
271         /* Grab an LWLock to keep different backends from mixing reports */
272         LWLockAcquire(&MainLWLockArray[0].lock, LW_EXCLUSIVE);
273
274         while ((lwstats = (lwlock_stats *) hash_seq_search(&scan)) != NULL)
275         {
276                 fprintf(stderr,
277                                 "PID %d lwlock %s %d: shacq %u exacq %u blk %u spindelay %u dequeue self %u\n",
278                                 MyProcPid, LWLockTrancheArray[lwstats->key.tranche]->name,
279                                 lwstats->key.instance, lwstats->sh_acquire_count,
280                                 lwstats->ex_acquire_count, lwstats->block_count,
281                                 lwstats->spin_delay_count, lwstats->dequeue_self_count);
282         }
283
284         LWLockRelease(&MainLWLockArray[0].lock);
285 }
286
287 static lwlock_stats *
288 get_lwlock_stats_entry(LWLock *lock)
289 {
290         lwlock_stats_key key;
291         lwlock_stats *lwstats;
292         bool            found;
293
294         /*
295          * During shared memory initialization, the hash table doesn't exist yet.
296          * Stats of that phase aren't very interesting, so just collect operations
297          * on all locks in a single dummy entry.
298          */
299         if (lwlock_stats_htab == NULL)
300                 return &lwlock_stats_dummy;
301
302         /* Fetch or create the entry. */
303         key.tranche = lock->tranche;
304         key.instance = T_ID(lock);
305         lwstats = hash_search(lwlock_stats_htab, &key, HASH_ENTER, &found);
306         if (!found)
307         {
308                 lwstats->sh_acquire_count = 0;
309                 lwstats->ex_acquire_count = 0;
310                 lwstats->block_count = 0;
311                 lwstats->dequeue_self_count = 0;
312                 lwstats->spin_delay_count = 0;
313         }
314         return lwstats;
315 }
316 #endif   /* LWLOCK_STATS */
317
318
319 /*
320  * Compute number of LWLocks to allocate in the main array.
321  */
322 static int
323 NumLWLocks(void)
324 {
325         int                     numLocks;
326
327         /*
328          * Possibly this logic should be spread out among the affected modules,
329          * the same way that shmem space estimation is done.  But for now, there
330          * are few enough users of LWLocks that we can get away with just keeping
331          * the knowledge here.
332          */
333
334         /* Predefined LWLocks */
335         numLocks = NUM_FIXED_LWLOCKS;
336
337         /* bufmgr.c needs two for each shared buffer */
338         numLocks += 2 * NBuffers;
339
340         /* proc.c needs one for each backend or auxiliary process */
341         numLocks += MaxBackends + NUM_AUXILIARY_PROCS;
342
343         /* clog.c needs one per CLOG buffer */
344         numLocks += CLOGShmemBuffers();
345
346         /* commit_ts.c needs one per CommitTs buffer */
347         numLocks += CommitTsShmemBuffers();
348
349         /* subtrans.c needs one per SubTrans buffer */
350         numLocks += NUM_SUBTRANS_BUFFERS;
351
352         /* multixact.c needs two SLRU areas */
353         numLocks += NUM_MXACTOFFSET_BUFFERS + NUM_MXACTMEMBER_BUFFERS;
354
355         /* async.c needs one per Async buffer */
356         numLocks += NUM_ASYNC_BUFFERS;
357
358         /* predicate.c needs one per old serializable xid buffer */
359         numLocks += NUM_OLDSERXID_BUFFERS;
360
361         /* slot.c needs one for each slot */
362         numLocks += max_replication_slots;
363
364         /*
365          * Add any requested by loadable modules; for backwards-compatibility
366          * reasons, allocate at least NUM_USER_DEFINED_LWLOCKS of them even if
367          * there are no explicit requests.
368          */
369         lock_addin_request_allowed = false;
370         numLocks += Max(lock_addin_request, NUM_USER_DEFINED_LWLOCKS);
371
372         return numLocks;
373 }
374
375
376 /*
377  * RequestAddinLWLocks
378  *              Request that extra LWLocks be allocated for use by
379  *              a loadable module.
380  *
381  * This is only useful if called from the _PG_init hook of a library that
382  * is loaded into the postmaster via shared_preload_libraries.  Once
383  * shared memory has been allocated, calls will be ignored.  (We could
384  * raise an error, but it seems better to make it a no-op, so that
385  * libraries containing such calls can be reloaded if needed.)
386  */
387 void
388 RequestAddinLWLocks(int n)
389 {
390         if (IsUnderPostmaster || !lock_addin_request_allowed)
391                 return;                                 /* too late */
392         lock_addin_request += n;
393 }
394
395
396 /*
397  * Compute shmem space needed for LWLocks.
398  */
399 Size
400 LWLockShmemSize(void)
401 {
402         Size            size;
403         int                     numLocks = NumLWLocks();
404
405         /* Space for the LWLock array. */
406         size = mul_size(numLocks, sizeof(LWLockPadded));
407
408         /* Space for dynamic allocation counter, plus room for alignment. */
409         size = add_size(size, 3 * sizeof(int) + LWLOCK_PADDED_SIZE);
410
411         return size;
412 }
413
414
415 /*
416  * Allocate shmem space for the main LWLock array and initialize it.  We also
417  * register the main tranch here.
418  */
419 void
420 CreateLWLocks(void)
421 {
422         StaticAssertExpr(LW_VAL_EXCLUSIVE > (uint32) MAX_BACKENDS,
423                                          "MAX_BACKENDS too big for lwlock.c");
424
425         if (!IsUnderPostmaster)
426         {
427                 int                     numLocks = NumLWLocks();
428                 Size            spaceLocks = LWLockShmemSize();
429                 LWLockPadded *lock;
430                 int                *LWLockCounter;
431                 char       *ptr;
432                 int                     id;
433
434                 /* Allocate space */
435                 ptr = (char *) ShmemAlloc(spaceLocks);
436
437                 /* Leave room for dynamic allocation of locks and tranches */
438                 ptr += 3 * sizeof(int);
439
440                 /* Ensure desired alignment of LWLock array */
441                 ptr += LWLOCK_PADDED_SIZE - ((uintptr_t) ptr) % LWLOCK_PADDED_SIZE;
442
443                 MainLWLockArray = (LWLockPadded *) ptr;
444
445                 /* Initialize all LWLocks in main array */
446                 for (id = 0, lock = MainLWLockArray; id < numLocks; id++, lock++)
447                         LWLockInitialize(&lock->lock, 0);
448
449                 /*
450                  * Initialize the dynamic-allocation counters, which are stored just
451                  * before the first LWLock.  LWLockCounter[0] is the allocation
452                  * counter for lwlocks, LWLockCounter[1] is the maximum number that
453                  * can be allocated from the main array, and LWLockCounter[2] is the
454                  * allocation counter for tranches.
455                  */
456                 LWLockCounter = (int *) ((char *) MainLWLockArray - 3 * sizeof(int));
457                 LWLockCounter[0] = NUM_FIXED_LWLOCKS;
458                 LWLockCounter[1] = numLocks;
459                 LWLockCounter[2] = 1;   /* 0 is the main array */
460         }
461
462         if (LWLockTrancheArray == NULL)
463         {
464                 LWLockTranchesAllocated = 16;
465                 LWLockTrancheArray = (LWLockTranche **)
466                         MemoryContextAlloc(TopMemoryContext,
467                                                   LWLockTranchesAllocated * sizeof(LWLockTranche *));
468         }
469
470         MainLWLockTranche.name = "main";
471         MainLWLockTranche.array_base = MainLWLockArray;
472         MainLWLockTranche.array_stride = sizeof(LWLockPadded);
473         LWLockRegisterTranche(0, &MainLWLockTranche);
474 }
475
476 /*
477  * InitLWLockAccess - initialize backend-local state needed to hold LWLocks
478  */
479 void
480 InitLWLockAccess(void)
481 {
482 #ifdef LWLOCK_STATS
483         init_lwlock_stats();
484 #endif
485 }
486
487 /*
488  * LWLockAssign - assign a dynamically-allocated LWLock number
489  *
490  * We interlock this using the same spinlock that is used to protect
491  * ShmemAlloc().  Interlocking is not really necessary during postmaster
492  * startup, but it is needed if any user-defined code tries to allocate
493  * LWLocks after startup.
494  */
495 LWLock *
496 LWLockAssign(void)
497 {
498         LWLock     *result;
499         int                *LWLockCounter;
500
501         LWLockCounter = (int *) ((char *) MainLWLockArray - 3 * sizeof(int));
502         SpinLockAcquire(ShmemLock);
503         if (LWLockCounter[0] >= LWLockCounter[1])
504         {
505                 SpinLockRelease(ShmemLock);
506                 elog(ERROR, "no more LWLocks available");
507         }
508         result = &MainLWLockArray[LWLockCounter[0]++].lock;
509         SpinLockRelease(ShmemLock);
510         return result;
511 }
512
513 /*
514  * Allocate a new tranche ID.
515  */
516 int
517 LWLockNewTrancheId(void)
518 {
519         int                     result;
520         int                *LWLockCounter;
521
522         LWLockCounter = (int *) ((char *) MainLWLockArray - 3 * sizeof(int));
523         SpinLockAcquire(ShmemLock);
524         result = LWLockCounter[2]++;
525         SpinLockRelease(ShmemLock);
526
527         return result;
528 }
529
530 /*
531  * Register a tranche ID in the lookup table for the current process.  This
532  * routine will save a pointer to the tranche object passed as an argument,
533  * so that object should be allocated in a backend-lifetime context
534  * (TopMemoryContext, static variable, or similar).
535  */
536 void
537 LWLockRegisterTranche(int tranche_id, LWLockTranche *tranche)
538 {
539         Assert(LWLockTrancheArray != NULL);
540
541         if (tranche_id >= LWLockTranchesAllocated)
542         {
543                 int                     i = LWLockTranchesAllocated;
544
545                 while (i <= tranche_id)
546                         i *= 2;
547
548                 LWLockTrancheArray = (LWLockTranche **)
549                         repalloc(LWLockTrancheArray,
550                                          i * sizeof(LWLockTranche *));
551                 LWLockTranchesAllocated = i;
552         }
553
554         LWLockTrancheArray[tranche_id] = tranche;
555 }
556
557 /*
558  * LWLockInitialize - initialize a new lwlock; it's initially unlocked
559  */
560 void
561 LWLockInitialize(LWLock *lock, int tranche_id)
562 {
563         SpinLockInit(&lock->mutex);
564         pg_atomic_init_u32(&lock->state, LW_FLAG_RELEASE_OK);
565 #ifdef LOCK_DEBUG
566         pg_atomic_init_u32(&lock->nwaiters, 0);
567 #endif
568         lock->tranche = tranche_id;
569         dlist_init(&lock->waiters);
570 }
571
572 /*
573  * Internal function that tries to atomically acquire the lwlock in the passed
574  * in mode.
575  *
576  * This function will not block waiting for a lock to become free - that's the
577  * callers job.
578  *
579  * Returns true if the lock isn't free and we need to wait.
580  */
581 static bool
582 LWLockAttemptLock(LWLock* lock, LWLockMode mode)
583 {
584         AssertArg(mode == LW_EXCLUSIVE || mode == LW_SHARED);
585
586         /* loop until we've determined whether we could acquire the lock or not */
587         while (true)
588         {
589                 uint32 old_state;
590                 uint32 expected_state;
591                 uint32 desired_state;
592                 bool lock_free;
593
594                 old_state = pg_atomic_read_u32(&lock->state);
595                 expected_state = old_state;
596                 desired_state = expected_state;
597
598                 if (mode == LW_EXCLUSIVE)
599                 {
600                         lock_free = (expected_state & LW_LOCK_MASK) == 0;
601                         if (lock_free)
602                                 desired_state += LW_VAL_EXCLUSIVE;
603                 }
604                 else
605                 {
606                         lock_free = (expected_state & LW_VAL_EXCLUSIVE) == 0;
607                         if (lock_free)
608                                 desired_state += LW_VAL_SHARED;
609                 }
610
611                 /*
612                  * Attempt to swap in the state we are expecting. If we didn't see
613                  * lock to be free, that's just the old value. If we saw it as free,
614                  * we'll attempt to mark it acquired. The reason that we always swap
615                  * in the value is that this doubles as a memory barrier. We could try
616                  * to be smarter and only swap in values if we saw the lock as free,
617                  * but benchmark haven't shown it as beneficial so far.
618                  *
619                  * Retry if the value changed since we last looked at it.
620                  */
621                 if (pg_atomic_compare_exchange_u32(&lock->state,
622                                                                                    &expected_state, desired_state))
623                 {
624                         if (lock_free)
625                         {
626                                 /* Great! Got the lock. */
627 #ifdef LOCK_DEBUG
628                                 if (mode == LW_EXCLUSIVE)
629                                         lock->owner = MyProc;
630 #endif
631                                 return false;
632                         }
633                         else
634                                 return true; /* someobdy else has the lock */
635                 }
636         }
637         pg_unreachable();
638 }
639
640 /*
641  * Wakeup all the lockers that currently have a chance to acquire the lock.
642  */
643 static void
644 LWLockWakeup(LWLock *lock)
645 {
646         bool            new_release_ok;
647         bool            wokeup_somebody = false;
648         dlist_head      wakeup;
649         dlist_mutable_iter iter;
650 #ifdef LWLOCK_STATS
651         lwlock_stats *lwstats;
652
653         lwstats = get_lwlock_stats_entry(lock);
654 #endif
655
656         dlist_init(&wakeup);
657
658         new_release_ok = true;
659
660         /* Acquire mutex.  Time spent holding mutex should be short! */
661 #ifdef LWLOCK_STATS
662         lwstats->spin_delay_count += SpinLockAcquire(&lock->mutex);
663 #else
664         SpinLockAcquire(&lock->mutex);
665 #endif
666
667         dlist_foreach_modify(iter, &lock->waiters)
668         {
669                 PGPROC *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur);
670
671                 if (wokeup_somebody && waiter->lwWaitMode == LW_EXCLUSIVE)
672                         continue;
673
674                 dlist_delete(&waiter->lwWaitLink);
675                 dlist_push_tail(&wakeup, &waiter->lwWaitLink);
676
677                 if (waiter->lwWaitMode != LW_WAIT_UNTIL_FREE)
678                 {
679                         /*
680                          * Prevent additional wakeups until retryer gets to run. Backends
681                          * that are just waiting for the lock to become free don't retry
682                          * automatically.
683                          */
684                         new_release_ok = false;
685                         /*
686                          * Don't wakeup (further) exclusive locks.
687                          */
688                         wokeup_somebody = true;
689                 }
690
691                 /*
692                  * Once we've woken up an exclusive lock, there's no point in waking
693                  * up anybody else.
694                  */
695                 if(waiter->lwWaitMode == LW_EXCLUSIVE)
696                         break;
697         }
698
699         Assert(dlist_is_empty(&wakeup) || pg_atomic_read_u32(&lock->state) & LW_FLAG_HAS_WAITERS);
700
701         /* Unset both flags at once if required */
702         if (!new_release_ok && dlist_is_empty(&wakeup))
703                 pg_atomic_fetch_and_u32(&lock->state,
704                                                                 ~(LW_FLAG_RELEASE_OK | LW_FLAG_HAS_WAITERS));
705         else if (!new_release_ok)
706                 pg_atomic_fetch_and_u32(&lock->state, ~LW_FLAG_RELEASE_OK);
707         else if (dlist_is_empty(&wakeup))
708                 pg_atomic_fetch_and_u32(&lock->state, ~LW_FLAG_HAS_WAITERS);
709         else if (new_release_ok)
710                 pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK);
711
712         /* We are done updating the shared state of the lock queue. */
713         SpinLockRelease(&lock->mutex);
714
715         /* Awaken any waiters I removed from the queue. */
716         dlist_foreach_modify(iter, &wakeup)
717         {
718                 PGPROC *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur);
719
720                 LOG_LWDEBUG("LWLockRelease", lock, "release waiter");
721                 dlist_delete(&waiter->lwWaitLink);
722                 /*
723                  * Guarantee that lwWaiting being unset only becomes visible once the
724                  * unlink from the link has completed. Otherwise the target backend
725                  * could be woken up for other reason and enqueue for a new lock - if
726                  * that happens before the list unlink happens, the list would end up
727                  * being corrupted.
728                  *
729                  * The barrier pairs with the SpinLockAcquire() when enqueing for
730                  * another lock.
731                  */
732                 pg_write_barrier();
733                 waiter->lwWaiting = false;
734                 PGSemaphoreUnlock(&waiter->sem);
735         }
736 }
737
738 /*
739  * Add ourselves to the end of the queue.
740  *
741  * NB: Mode can be LW_WAIT_UNTIL_FREE here!
742  */
743 static void
744 LWLockQueueSelf(LWLock *lock, LWLockMode mode)
745 {
746 #ifdef LWLOCK_STATS
747         lwlock_stats *lwstats;
748
749         lwstats = get_lwlock_stats_entry(lock);
750 #endif
751
752         /*
753          * If we don't have a PGPROC structure, there's no way to wait. This
754          * should never occur, since MyProc should only be null during shared
755          * memory initialization.
756          */
757         if (MyProc == NULL)
758                 elog(PANIC, "cannot wait without a PGPROC structure");
759
760         if (MyProc->lwWaiting)
761                 elog(PANIC, "queueing for lock while waiting on another one");
762
763 #ifdef LWLOCK_STATS
764         lwstats->spin_delay_count += SpinLockAcquire(&lock->mutex);
765 #else
766         SpinLockAcquire(&lock->mutex);
767 #endif
768
769         /* setting the flag is protected by the spinlock */
770         pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_HAS_WAITERS);
771
772         MyProc->lwWaiting = true;
773         MyProc->lwWaitMode = mode;
774
775         /* LW_WAIT_UNTIL_FREE waiters are always at the front of the queue */
776         if (mode == LW_WAIT_UNTIL_FREE)
777                 dlist_push_head(&lock->waiters, &MyProc->lwWaitLink);
778         else
779                 dlist_push_tail(&lock->waiters, &MyProc->lwWaitLink);
780
781         /* Can release the mutex now */
782         SpinLockRelease(&lock->mutex);
783
784 #ifdef LOCK_DEBUG
785         pg_atomic_fetch_add_u32(&lock->nwaiters, 1);
786 #endif
787
788 }
789
790 /*
791  * Remove ourselves from the waitlist.
792  *
793  * This is used if we queued ourselves because we thought we needed to sleep
794  * but, after further checking, we discovered that we don't actually need to
795  * do so. Returns false if somebody else already has woken us up, otherwise
796  * returns true.
797  */
798 static void
799 LWLockDequeueSelf(LWLock *lock)
800 {
801         bool    found = false;
802         dlist_mutable_iter iter;
803
804 #ifdef LWLOCK_STATS
805         lwlock_stats *lwstats;
806
807         lwstats = get_lwlock_stats_entry(lock);
808
809         lwstats->dequeue_self_count++;
810 #endif
811
812 #ifdef LWLOCK_STATS
813         lwstats->spin_delay_count += SpinLockAcquire(&lock->mutex);
814 #else
815         SpinLockAcquire(&lock->mutex);
816 #endif
817
818         /*
819          * Can't just remove ourselves from the list, but we need to iterate over
820          * all entries as somebody else could have unqueued us.
821          */
822         dlist_foreach_modify(iter, &lock->waiters)
823         {
824                 PGPROC *proc = dlist_container(PGPROC, lwWaitLink, iter.cur);
825                 if (proc == MyProc)
826                 {
827                         found = true;
828                         dlist_delete(&proc->lwWaitLink);
829                         break;
830                 }
831         }
832
833         if (dlist_is_empty(&lock->waiters) &&
834                 (pg_atomic_read_u32(&lock->state) & LW_FLAG_HAS_WAITERS) != 0)
835         {
836                 pg_atomic_fetch_and_u32(&lock->state, ~LW_FLAG_HAS_WAITERS);
837         }
838
839         SpinLockRelease(&lock->mutex);
840
841         /* clear waiting state again, nice for debugging */
842         if (found)
843                 MyProc->lwWaiting = false;
844         else
845         {
846                 int             extraWaits = 0;
847
848                 /*
849                  * Somebody else dequeued us and has or will wake us up. Deal with the
850                  * superflous absorption of a wakeup.
851                  */
852
853                 /*
854                  * Reset releaseOk if somebody woke us before we removed ourselves -
855                  * they'll have set it to false.
856                  */
857                 pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK);
858
859                 /*
860                  * Now wait for the scheduled wakeup, otherwise our ->lwWaiting would
861                  * get reset at some inconvenient point later. Most of the time this
862                  * will immediately return.
863                  */
864                 for (;;)
865                 {
866                         /* "false" means cannot accept cancel/die interrupt here. */
867                         PGSemaphoreLock(&MyProc->sem, false);
868                         if (!MyProc->lwWaiting)
869                                 break;
870                         extraWaits++;
871                 }
872
873                 /*
874                  * Fix the process wait semaphore's count for any absorbed wakeups.
875                  */
876                 while (extraWaits-- > 0)
877                         PGSemaphoreUnlock(&MyProc->sem);
878         }
879
880 #ifdef LOCK_DEBUG
881         {
882                 /* not waiting anymore */
883                 uint32 nwaiters = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
884                 Assert(nwaiters < MAX_BACKENDS);
885         }
886 #endif
887 }
888
889 /*
890  * LWLockAcquire - acquire a lightweight lock in the specified mode
891  *
892  * If the lock is not available, sleep until it is.  Returns true if the lock
893  * was available immediately, false if we had to sleep.
894  *
895  * Side effect: cancel/die interrupts are held off until lock release.
896  */
897 bool
898 LWLockAcquire(LWLock *l, LWLockMode mode)
899 {
900         return LWLockAcquireCommon(l, mode, NULL, 0);
901 }
902
903 /*
904  * LWLockAcquireWithVar - like LWLockAcquire, but also sets *valptr = val
905  *
906  * The lock is always acquired in exclusive mode with this function.
907  */
908 bool
909 LWLockAcquireWithVar(LWLock *l, uint64 *valptr, uint64 val)
910 {
911         return LWLockAcquireCommon(l, LW_EXCLUSIVE, valptr, val);
912 }
913
914 /* internal function to implement LWLockAcquire and LWLockAcquireWithVar */
915 static inline bool
916 LWLockAcquireCommon(LWLock *lock, LWLockMode mode, uint64 *valptr, uint64 val)
917 {
918         PGPROC     *proc = MyProc;
919         bool            result = true;
920         int                     extraWaits = 0;
921 #ifdef LWLOCK_STATS
922         lwlock_stats *lwstats;
923
924         lwstats = get_lwlock_stats_entry(lock);
925 #endif
926
927         AssertArg(mode == LW_SHARED || mode == LW_EXCLUSIVE);
928
929         PRINT_LWDEBUG("LWLockAcquire", lock, mode);
930
931 #ifdef LWLOCK_STATS
932         /* Count lock acquisition attempts */
933         if (mode == LW_EXCLUSIVE)
934                 lwstats->ex_acquire_count++;
935         else
936                 lwstats->sh_acquire_count++;
937 #endif   /* LWLOCK_STATS */
938
939         /*
940          * We can't wait if we haven't got a PGPROC.  This should only occur
941          * during bootstrap or shared memory initialization.  Put an Assert here
942          * to catch unsafe coding practices.
943          */
944         Assert(!(proc == NULL && IsUnderPostmaster));
945
946         /* Ensure we will have room to remember the lock */
947         if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
948                 elog(ERROR, "too many LWLocks taken");
949
950         /*
951          * Lock out cancel/die interrupts until we exit the code section protected
952          * by the LWLock.  This ensures that interrupts will not interfere with
953          * manipulations of data structures in shared memory.
954          */
955         HOLD_INTERRUPTS();
956
957         /*
958          * Loop here to try to acquire lock after each time we are signaled by
959          * LWLockRelease.
960          *
961          * NOTE: it might seem better to have LWLockRelease actually grant us the
962          * lock, rather than retrying and possibly having to go back to sleep. But
963          * in practice that is no good because it means a process swap for every
964          * lock acquisition when two or more processes are contending for the same
965          * lock.  Since LWLocks are normally used to protect not-very-long
966          * sections of computation, a process needs to be able to acquire and
967          * release the same lock many times during a single CPU time slice, even
968          * in the presence of contention.  The efficiency of being able to do that
969          * outweighs the inefficiency of sometimes wasting a process dispatch
970          * cycle because the lock is not free when a released waiter finally gets
971          * to run.  See pgsql-hackers archives for 29-Dec-01.
972          */
973         for (;;)
974         {
975                 bool            mustwait;
976
977                 /*
978                  * Try to grab the lock the first time, we're not in the waitqueue
979                  * yet/anymore.
980                  */
981                 mustwait = LWLockAttemptLock(lock, mode);
982
983                 if (!mustwait)
984                 {
985                         /* XXX: remove before commit? */
986                         LOG_LWDEBUG("LWLockAcquire", lock, "immediately acquired lock");
987                         break;                          /* got the lock */
988                 }
989
990                 /*
991                  * Ok, at this point we couldn't grab the lock on the first try. We
992                  * cannot simply queue ourselves to the end of the list and wait to be
993                  * woken up because by now the lock could long have been released.
994                  * Instead add us to the queue and try to grab the lock again. If we
995                  * succeed we need to revert the queuing and be happy, otherwise we
996                  * recheck the lock. If we still couldn't grab it, we know that the
997                  * other lock will see our queue entries when releasing since they
998                  * existed before we checked for the lock.
999                  */
1000
1001                 /* add to the queue */
1002                 LWLockQueueSelf(lock, mode);
1003
1004                 /* we're now guaranteed to be woken up if necessary */
1005                 mustwait = LWLockAttemptLock(lock, mode);
1006
1007                 /* ok, grabbed the lock the second time round, need to undo queueing */
1008                 if (!mustwait)
1009                 {
1010                         LOG_LWDEBUG("LWLockAcquire", lock, "acquired, undoing queue");
1011
1012                         LWLockDequeueSelf(lock);
1013                         break;
1014                 }
1015
1016                 /*
1017                  * Wait until awakened.
1018                  *
1019                  * Since we share the process wait semaphore with the regular lock
1020                  * manager and ProcWaitForSignal, and we may need to acquire an LWLock
1021                  * while one of those is pending, it is possible that we get awakened
1022                  * for a reason other than being signaled by LWLockRelease. If so,
1023                  * loop back and wait again.  Once we've gotten the LWLock,
1024                  * re-increment the sema by the number of additional signals received,
1025                  * so that the lock manager or signal manager will see the received
1026                  * signal when it next waits.
1027                  */
1028                 LOG_LWDEBUG("LWLockAcquire", lock, "waiting");
1029
1030 #ifdef LWLOCK_STATS
1031                 lwstats->block_count++;
1032 #endif
1033
1034                 TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), T_ID(lock), mode);
1035
1036                 for (;;)
1037                 {
1038                         /* "false" means cannot accept cancel/die interrupt here. */
1039                         PGSemaphoreLock(&proc->sem, false);
1040                         if (!proc->lwWaiting)
1041                                 break;
1042                         extraWaits++;
1043                 }
1044
1045                 /* Retrying, allow LWLockRelease to release waiters again. */
1046                 pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK);
1047
1048 #ifdef LOCK_DEBUG
1049                 {
1050                         /* not waiting anymore */
1051                         uint32 nwaiters = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
1052                         Assert(nwaiters < MAX_BACKENDS);
1053                 }
1054 #endif
1055
1056                 TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), T_ID(lock), mode);
1057
1058                 LOG_LWDEBUG("LWLockAcquire", lock, "awakened");
1059
1060                 /* Now loop back and try to acquire lock again. */
1061                 result = false;
1062         }
1063
1064         /* If there's a variable associated with this lock, initialize it */
1065         if (valptr)
1066                 *valptr = val;
1067
1068         TRACE_POSTGRESQL_LWLOCK_ACQUIRE(T_NAME(lock), T_ID(lock), mode);
1069
1070         /* Add lock to list of locks held by this backend */
1071         held_lwlocks[num_held_lwlocks].lock = lock;
1072         held_lwlocks[num_held_lwlocks++].mode = mode;
1073
1074         /*
1075          * Fix the process wait semaphore's count for any absorbed wakeups.
1076          */
1077         while (extraWaits-- > 0)
1078                 PGSemaphoreUnlock(&proc->sem);
1079
1080         return result;
1081 }
1082
1083 /*
1084  * LWLockConditionalAcquire - acquire a lightweight lock in the specified mode
1085  *
1086  * If the lock is not available, return FALSE with no side-effects.
1087  *
1088  * If successful, cancel/die interrupts are held off until lock release.
1089  */
1090 bool
1091 LWLockConditionalAcquire(LWLock *lock, LWLockMode mode)
1092 {
1093         bool            mustwait;
1094
1095         AssertArg(mode == LW_SHARED || mode == LW_EXCLUSIVE);
1096
1097         PRINT_LWDEBUG("LWLockConditionalAcquire", lock, mode);
1098
1099         /* Ensure we will have room to remember the lock */
1100         if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
1101                 elog(ERROR, "too many LWLocks taken");
1102
1103         /*
1104          * Lock out cancel/die interrupts until we exit the code section protected
1105          * by the LWLock.  This ensures that interrupts will not interfere with
1106          * manipulations of data structures in shared memory.
1107          */
1108         HOLD_INTERRUPTS();
1109
1110         /* Check for the lock */
1111         mustwait = LWLockAttemptLock(lock, mode);
1112
1113         if (mustwait)
1114         {
1115                 /* Failed to get lock, so release interrupt holdoff */
1116                 RESUME_INTERRUPTS();
1117
1118                 LOG_LWDEBUG("LWLockConditionalAcquire", lock, "failed");
1119                 TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE_FAIL(T_NAME(lock), T_ID(lock), mode);
1120         }
1121         else
1122         {
1123                 /* Add lock to list of locks held by this backend */
1124                 held_lwlocks[num_held_lwlocks].lock = lock;
1125                 held_lwlocks[num_held_lwlocks++].mode = mode;
1126                 TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE(T_NAME(lock), T_ID(lock), mode);
1127         }
1128         return !mustwait;
1129 }
1130
1131 /*
1132  * LWLockAcquireOrWait - Acquire lock, or wait until it's free
1133  *
1134  * The semantics of this function are a bit funky.  If the lock is currently
1135  * free, it is acquired in the given mode, and the function returns true.  If
1136  * the lock isn't immediately free, the function waits until it is released
1137  * and returns false, but does not acquire the lock.
1138  *
1139  * This is currently used for WALWriteLock: when a backend flushes the WAL,
1140  * holding WALWriteLock, it can flush the commit records of many other
1141  * backends as a side-effect.  Those other backends need to wait until the
1142  * flush finishes, but don't need to acquire the lock anymore.  They can just
1143  * wake up, observe that their records have already been flushed, and return.
1144  */
1145 bool
1146 LWLockAcquireOrWait(LWLock *lock, LWLockMode mode)
1147 {
1148         PGPROC     *proc = MyProc;
1149         bool            mustwait;
1150         int                     extraWaits = 0;
1151 #ifdef LWLOCK_STATS
1152         lwlock_stats *lwstats;
1153
1154         lwstats = get_lwlock_stats_entry(lock);
1155 #endif
1156
1157         Assert(mode == LW_SHARED || mode == LW_EXCLUSIVE);
1158
1159         PRINT_LWDEBUG("LWLockAcquireOrWait", lock, mode);
1160
1161         /* Ensure we will have room to remember the lock */
1162         if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
1163                 elog(ERROR, "too many LWLocks taken");
1164
1165         /*
1166          * Lock out cancel/die interrupts until we exit the code section protected
1167          * by the LWLock.  This ensures that interrupts will not interfere with
1168          * manipulations of data structures in shared memory.
1169          */
1170         HOLD_INTERRUPTS();
1171
1172         /*
1173          * NB: We're using nearly the same twice-in-a-row lock acquisition
1174          * protocol as LWLockAcquire(). Check its comments for details.
1175          */
1176         mustwait = LWLockAttemptLock(lock, mode);
1177
1178         if (mustwait)
1179         {
1180                 LWLockQueueSelf(lock, LW_WAIT_UNTIL_FREE);
1181
1182                 mustwait = LWLockAttemptLock(lock, mode);
1183
1184                 if (mustwait)
1185                 {
1186                         /*
1187                          * Wait until awakened.  Like in LWLockAcquire, be prepared for bogus
1188                          * wakups, because we share the semaphore with ProcWaitForSignal.
1189                          */
1190                         LOG_LWDEBUG("LWLockAcquireOrWait", lock, "waiting");
1191
1192 #ifdef LWLOCK_STATS
1193                         lwstats->block_count++;
1194 #endif
1195                         TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), T_ID(lock), mode);
1196
1197                         for (;;)
1198                         {
1199                                 /* "false" means cannot accept cancel/die interrupt here. */
1200                                 PGSemaphoreLock(&proc->sem, false);
1201                                 if (!proc->lwWaiting)
1202                                         break;
1203                                 extraWaits++;
1204                         }
1205
1206 #ifdef LOCK_DEBUG
1207                         {
1208                                 /* not waiting anymore */
1209                                 uint32 nwaiters = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
1210                                 Assert(nwaiters < MAX_BACKENDS);
1211                         }
1212 #endif
1213                         TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), T_ID(lock), mode);
1214
1215                         LOG_LWDEBUG("LWLockAcquireOrWait", lock, "awakened");
1216                 }
1217                 else
1218                 {
1219                         LOG_LWDEBUG("LWLockAcquireOrWait", lock, "acquired, undoing queue");
1220
1221                         /*
1222                           * Got lock in the second attempt, undo queueing. We need to
1223                           * treat this as having successfully acquired the lock, otherwise
1224                           * we'd not necessarily wake up people we've prevented from
1225                           * acquiring the lock.
1226                           */
1227                         LWLockDequeueSelf(lock);
1228                 }
1229         }
1230
1231         /*
1232          * Fix the process wait semaphore's count for any absorbed wakeups.
1233          */
1234         while (extraWaits-- > 0)
1235                 PGSemaphoreUnlock(&proc->sem);
1236
1237         if (mustwait)
1238         {
1239                 /* Failed to get lock, so release interrupt holdoff */
1240                 RESUME_INTERRUPTS();
1241                 LOG_LWDEBUG("LWLockAcquireOrWait", lock, "failed");
1242                 TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT_FAIL(T_NAME(lock), T_ID(lock),
1243                                                                                                          mode);
1244         }
1245         else
1246         {
1247                 LOG_LWDEBUG("LWLockAcquireOrWait", lock, "succeeded");
1248                 /* Add lock to list of locks held by this backend */
1249                 held_lwlocks[num_held_lwlocks].lock = lock;
1250                 held_lwlocks[num_held_lwlocks++].mode = mode;
1251                 TRACE_POSTGRESQL_LWLOCK_ACQUIRE_OR_WAIT(T_NAME(lock), T_ID(lock), mode);
1252         }
1253
1254         return !mustwait;
1255 }
1256
1257 /*
1258  * LWLockWaitForVar - Wait until lock is free, or a variable is updated.
1259  *
1260  * If the lock is held and *valptr equals oldval, waits until the lock is
1261  * either freed, or the lock holder updates *valptr by calling
1262  * LWLockUpdateVar.  If the lock is free on exit (immediately or after
1263  * waiting), returns true.  If the lock is still held, but *valptr no longer
1264  * matches oldval, returns false and sets *newval to the current value in
1265  * *valptr.
1266  *
1267  * It's possible that the lock holder releases the lock, but another backend
1268  * acquires it again before we get a chance to observe that the lock was
1269  * momentarily released.  We wouldn't need to wait for the new lock holder,
1270  * but we cannot distinguish that case, so we will have to wait.
1271  *
1272  * Note: this function ignores shared lock holders; if the lock is held
1273  * in shared mode, returns 'true'.
1274  */
1275 bool
1276 LWLockWaitForVar(LWLock *lock, uint64 *valptr, uint64 oldval, uint64 *newval)
1277 {
1278         PGPROC     *proc = MyProc;
1279         int                     extraWaits = 0;
1280         bool            result = false;
1281 #ifdef LWLOCK_STATS
1282         lwlock_stats *lwstats;
1283
1284         lwstats = get_lwlock_stats_entry(lock);
1285 #endif
1286
1287         PRINT_LWDEBUG("LWLockWaitForVar", lock, LW_WAIT_UNTIL_FREE);
1288
1289         /*
1290          * Quick test first to see if it the slot is free right now.
1291          *
1292          * XXX: the caller uses a spinlock before this, so we don't need a memory
1293          * barrier here as far as the current usage is concerned.  But that might
1294          * not be safe in general.
1295          */
1296         if ((pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE) == 0)
1297                 return true;
1298
1299         /*
1300          * Lock out cancel/die interrupts while we sleep on the lock.  There is no
1301          * cleanup mechanism to remove us from the wait queue if we got
1302          * interrupted.
1303          */
1304         HOLD_INTERRUPTS();
1305
1306         /*
1307          * Loop here to check the lock's status after each time we are signaled.
1308          */
1309         for (;;)
1310         {
1311                 bool            mustwait;
1312                 uint64          value;
1313
1314                 mustwait = (pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE) != 0;
1315
1316                 if (mustwait)
1317                 {
1318                         /*
1319                          * Perform comparison using spinlock as we can't rely on atomic 64
1320                          * bit reads/stores.
1321                          */
1322 #ifdef LWLOCK_STATS
1323                         lwstats->spin_delay_count += SpinLockAcquire(&lock->mutex);
1324 #else
1325                         SpinLockAcquire(&lock->mutex);
1326 #endif
1327
1328                         /*
1329                          * XXX: We can significantly optimize this on platforms with 64bit
1330                          * atomics.
1331                          */
1332                         value = *valptr;
1333                         if (value != oldval)
1334                         {
1335                                 result = false;
1336                                 mustwait = false;
1337                                 *newval = value;
1338                         }
1339                         else
1340                                 mustwait = true;
1341                         SpinLockRelease(&lock->mutex);
1342                 }
1343                 else
1344                         mustwait = false;
1345
1346                 if (!mustwait)
1347                         break;                          /* the lock was free or value didn't match */
1348
1349                 /*
1350                  * Add myself to wait queue. Note that this is racy, somebody else
1351                  * could wakeup before we're finished queuing.
1352                  * NB: We're using nearly the same twice-in-a-row lock acquisition
1353                  * protocol as LWLockAcquire(). Check its comments for details.
1354                  */
1355                 LWLockQueueSelf(lock, LW_WAIT_UNTIL_FREE);
1356
1357                 /*
1358                  * Set RELEASE_OK flag, to make sure we get woken up as soon as the
1359                  * lock is released.
1360                  */
1361                 pg_atomic_fetch_or_u32(&lock->state, LW_FLAG_RELEASE_OK);
1362
1363                 /*
1364                  * We're now guaranteed to be woken up if necessary. Recheck the
1365                  * lock's state.
1366                  */
1367                 mustwait = (pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE) != 0;
1368
1369                 /* Ok, lock is free after we queued ourselves. Undo queueing. */
1370                 if (!mustwait)
1371                 {
1372                         LOG_LWDEBUG("LWLockWaitForVar", lock, "free, undoing queue");
1373
1374                         LWLockDequeueSelf(lock);
1375                         break;
1376                 }
1377
1378                 /*
1379                  * Wait until awakened.
1380                  *
1381                  * Since we share the process wait semaphore with the regular lock
1382                  * manager and ProcWaitForSignal, and we may need to acquire an LWLock
1383                  * while one of those is pending, it is possible that we get awakened
1384                  * for a reason other than being signaled by LWLockRelease. If so,
1385                  * loop back and wait again.  Once we've gotten the LWLock,
1386                  * re-increment the sema by the number of additional signals received,
1387                  * so that the lock manager or signal manager will see the received
1388                  * signal when it next waits.
1389                  */
1390                 LOG_LWDEBUG("LWLockWaitForVar", lock, "waiting");
1391
1392 #ifdef LWLOCK_STATS
1393                 lwstats->block_count++;
1394 #endif
1395
1396                 TRACE_POSTGRESQL_LWLOCK_WAIT_START(T_NAME(lock), T_ID(lock),
1397                                                                                    LW_EXCLUSIVE);
1398
1399                 for (;;)
1400                 {
1401                         /* "false" means cannot accept cancel/die interrupt here. */
1402                         PGSemaphoreLock(&proc->sem, false);
1403                         if (!proc->lwWaiting)
1404                                 break;
1405                         extraWaits++;
1406                 }
1407
1408 #ifdef LOCK_DEBUG
1409                 {
1410                         /* not waiting anymore */
1411                         uint32 nwaiters = pg_atomic_fetch_sub_u32(&lock->nwaiters, 1);
1412                         Assert(nwaiters < MAX_BACKENDS);
1413                 }
1414 #endif
1415
1416                 TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(T_NAME(lock), T_ID(lock),
1417                                                                                   LW_EXCLUSIVE);
1418
1419                 LOG_LWDEBUG("LWLockWaitForVar", lock, "awakened");
1420
1421                 /* Now loop back and check the status of the lock again. */
1422         }
1423
1424         TRACE_POSTGRESQL_LWLOCK_ACQUIRE(T_NAME(lock), T_ID(lock), LW_EXCLUSIVE);
1425
1426         /*
1427          * Fix the process wait semaphore's count for any absorbed wakeups.
1428          */
1429         while (extraWaits-- > 0)
1430                 PGSemaphoreUnlock(&proc->sem);
1431
1432         /*
1433          * Now okay to allow cancel/die interrupts.
1434          */
1435         RESUME_INTERRUPTS();
1436
1437         return result;
1438 }
1439
1440
1441 /*
1442  * LWLockUpdateVar - Update a variable and wake up waiters atomically
1443  *
1444  * Sets *valptr to 'val', and wakes up all processes waiting for us with
1445  * LWLockWaitForVar().  Setting the value and waking up the processes happen
1446  * atomically so that any process calling LWLockWaitForVar() on the same lock
1447  * is guaranteed to see the new value, and act accordingly.
1448  *
1449  * The caller must be holding the lock in exclusive mode.
1450  */
1451 void
1452 LWLockUpdateVar(LWLock *lock, uint64 *valptr, uint64 val)
1453 {
1454         dlist_head      wakeup;
1455         dlist_mutable_iter iter;
1456 #ifdef LWLOCK_STATS
1457         lwlock_stats *lwstats;
1458
1459         lwstats = get_lwlock_stats_entry(lock);
1460 #endif
1461
1462         PRINT_LWDEBUG("LWLockUpdateVar", lock, LW_EXCLUSIVE);
1463
1464         dlist_init(&wakeup);
1465
1466         /* Acquire mutex.  Time spent holding mutex should be short! */
1467 #ifdef LWLOCK_STATS
1468         lwstats->spin_delay_count += SpinLockAcquire(&lock->mutex);
1469 #else
1470         SpinLockAcquire(&lock->mutex);
1471 #endif
1472
1473         Assert(pg_atomic_read_u32(&lock->state) & LW_VAL_EXCLUSIVE);
1474
1475         /* Update the lock's value */
1476         *valptr = val;
1477
1478         /*
1479          * See if there are any LW_WAIT_UNTIL_FREE waiters that need to be woken
1480          * up. They are always in the front of the queue.
1481          */
1482         dlist_foreach_modify(iter, &lock->waiters)
1483         {
1484                 PGPROC *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur);
1485
1486                 if (waiter->lwWaitMode != LW_WAIT_UNTIL_FREE)
1487                         break;
1488
1489                 dlist_delete(&waiter->lwWaitLink);
1490                 dlist_push_tail(&wakeup, &waiter->lwWaitLink);
1491         }
1492
1493         /* We are done updating shared state of the lock itself. */
1494         SpinLockRelease(&lock->mutex);
1495
1496         /*
1497          * Awaken any waiters I removed from the queue.
1498          */
1499         dlist_foreach_modify(iter, &wakeup)
1500         {
1501                 PGPROC *waiter = dlist_container(PGPROC, lwWaitLink, iter.cur);
1502                 dlist_delete(&waiter->lwWaitLink);
1503                 /* check comment in LWLockWakeup() about this barrier */
1504                 pg_write_barrier();
1505                 waiter->lwWaiting = false;
1506                 PGSemaphoreUnlock(&waiter->sem);
1507         }
1508 }
1509
1510
1511 /*
1512  * LWLockRelease - release a previously acquired lock
1513  */
1514 void
1515 LWLockRelease(LWLock *lock)
1516 {
1517         LWLockMode      mode;
1518         uint32          oldstate;
1519         bool            check_waiters;
1520         int                     i;
1521
1522         /*
1523          * Remove lock from list of locks held.  Usually, but not always, it will
1524          * be the latest-acquired lock; so search array backwards.
1525          */
1526         for (i = num_held_lwlocks; --i >= 0;)
1527         {
1528                 if (lock == held_lwlocks[i].lock)
1529                 {
1530                         mode = held_lwlocks[i].mode;
1531                         break;
1532                 }
1533         }
1534         if (i < 0)
1535                 elog(ERROR, "lock %s %d is not held", T_NAME(lock), T_ID(lock));
1536         num_held_lwlocks--;
1537         for (; i < num_held_lwlocks; i++)
1538                 held_lwlocks[i] = held_lwlocks[i + 1];
1539
1540         PRINT_LWDEBUG("LWLockRelease", lock, mode);
1541
1542         /*
1543          * Release my hold on lock, after that it can immediately be acquired by
1544          * others, even if we still have to wakeup other waiters.
1545          */
1546         if (mode == LW_EXCLUSIVE)
1547                 oldstate = pg_atomic_sub_fetch_u32(&lock->state, LW_VAL_EXCLUSIVE);
1548         else
1549                 oldstate = pg_atomic_sub_fetch_u32(&lock->state, LW_VAL_SHARED);
1550
1551         /* nobody else can have that kind of lock */
1552         Assert(!(oldstate & LW_VAL_EXCLUSIVE));
1553
1554
1555         /*
1556          * We're still waiting for backends to get scheduled, don't wake them up
1557          * again.
1558          */
1559         if ((oldstate & (LW_FLAG_HAS_WAITERS | LW_FLAG_RELEASE_OK)) ==
1560                 (LW_FLAG_HAS_WAITERS | LW_FLAG_RELEASE_OK) &&
1561                 (oldstate & LW_LOCK_MASK) == 0)
1562                 check_waiters = true;
1563         else
1564                 check_waiters = false;
1565
1566         /*
1567          * As waking up waiters requires the spinlock to be acquired, only do so
1568          * if necessary.
1569          */
1570         if (check_waiters)
1571         {
1572                 /* XXX: remove before commit? */
1573                 LOG_LWDEBUG("LWLockRelease", lock, "releasing waiters");
1574                 LWLockWakeup(lock);
1575         }
1576
1577         TRACE_POSTGRESQL_LWLOCK_RELEASE(T_NAME(lock), T_ID(lock));
1578
1579         /*
1580          * Now okay to allow cancel/die interrupts.
1581          */
1582         RESUME_INTERRUPTS();
1583 }
1584
1585
1586 /*
1587  * LWLockReleaseAll - release all currently-held locks
1588  *
1589  * Used to clean up after ereport(ERROR). An important difference between this
1590  * function and retail LWLockRelease calls is that InterruptHoldoffCount is
1591  * unchanged by this operation.  This is necessary since InterruptHoldoffCount
1592  * has been set to an appropriate level earlier in error recovery. We could
1593  * decrement it below zero if we allow it to drop for each released lock!
1594  */
1595 void
1596 LWLockReleaseAll(void)
1597 {
1598         while (num_held_lwlocks > 0)
1599         {
1600                 HOLD_INTERRUPTS();              /* match the upcoming RESUME_INTERRUPTS */
1601
1602                 LWLockRelease(held_lwlocks[num_held_lwlocks - 1].lock);
1603         }
1604 }
1605
1606
1607 /*
1608  * LWLockHeldByMe - test whether my process currently holds a lock
1609  *
1610  * This is meant as debug support only.  We currently do not distinguish
1611  * whether the lock is held shared or exclusive.
1612  */
1613 bool
1614 LWLockHeldByMe(LWLock *l)
1615 {
1616         int                     i;
1617
1618         for (i = 0; i < num_held_lwlocks; i++)
1619         {
1620                 if (held_lwlocks[i].lock == l)
1621                         return true;
1622         }
1623         return false;
1624 }