]> granicus.if.org Git - postgresql/blob - src/backend/storage/lmgr/lwlock.c
036c32fad026eaba08e6abbb102cdd81a1133249
[postgresql] / src / backend / storage / lmgr / lwlock.c
1 /*-------------------------------------------------------------------------
2  *
3  * lwlock.c
4  *        Lightweight lock manager
5  *
6  * Lightweight locks are intended primarily to provide mutual exclusion of
7  * access to shared-memory data structures.  Therefore, they offer both
8  * exclusive and shared lock modes (to support read/write and read-only
9  * access to a shared object).  There are few other frammishes.  User-level
10  * locking should be done with the full lock manager --- which depends on
11  * LWLocks to protect its shared state.
12  *
13  *
14  * Portions Copyright (c) 1996-2008, PostgreSQL Global Development Group
15  * Portions Copyright (c) 1994, Regents of the University of California
16  *
17  * IDENTIFICATION
18  *        $PostgreSQL: pgsql/src/backend/storage/lmgr/lwlock.c,v 1.50 2008/01/01 19:45:52 momjian Exp $
19  *
20  *-------------------------------------------------------------------------
21  */
22 #include "postgres.h"
23
24 #include "access/clog.h"
25 #include "access/multixact.h"
26 #include "access/subtrans.h"
27 #include "miscadmin.h"
28 #include "storage/ipc.h"
29 #include "storage/proc.h"
30 #include "storage/spin.h"
31
32
33 /* We use the ShmemLock spinlock to protect LWLockAssign */
34 extern slock_t *ShmemLock;
35
36
37 typedef struct LWLock
38 {
39         slock_t         mutex;                  /* Protects LWLock and queue of PGPROCs */
40         bool            releaseOK;              /* T if ok to release waiters */
41         char            exclusive;              /* # of exclusive holders (0 or 1) */
42         int                     shared;                 /* # of shared holders (0..MaxBackends) */
43         PGPROC     *head;                       /* head of list of waiting PGPROCs */
44         PGPROC     *tail;                       /* tail of list of waiting PGPROCs */
45         /* tail is undefined when head is NULL */
46 } LWLock;
47
48 /*
49  * All the LWLock structs are allocated as an array in shared memory.
50  * (LWLockIds are indexes into the array.)      We force the array stride to
51  * be a power of 2, which saves a few cycles in indexing, but more
52  * importantly also ensures that individual LWLocks don't cross cache line
53  * boundaries.  This reduces cache contention problems, especially on AMD
54  * Opterons.  (Of course, we have to also ensure that the array start
55  * address is suitably aligned.)
56  *
57  * LWLock is between 16 and 32 bytes on all known platforms, so these two
58  * cases are sufficient.
59  */
60 #define LWLOCK_PADDED_SIZE      (sizeof(LWLock) <= 16 ? 16 : 32)
61
62 typedef union LWLockPadded
63 {
64         LWLock          lock;
65         char            pad[LWLOCK_PADDED_SIZE];
66 } LWLockPadded;
67
68 /*
69  * This points to the array of LWLocks in shared memory.  Backends inherit
70  * the pointer by fork from the postmaster (except in the EXEC_BACKEND case,
71  * where we have special measures to pass it down).
72  */
73 NON_EXEC_STATIC LWLockPadded *LWLockArray = NULL;
74
75
76 /*
77  * We use this structure to keep track of locked LWLocks for release
78  * during error recovery.  The maximum size could be determined at runtime
79  * if necessary, but it seems unlikely that more than a few locks could
80  * ever be held simultaneously.
81  */
82 #define MAX_SIMUL_LWLOCKS       100
83
84 static int      num_held_lwlocks = 0;
85 static LWLockId held_lwlocks[MAX_SIMUL_LWLOCKS];
86
87 static int      lock_addin_request = 0;
88 static bool lock_addin_request_allowed = true;
89
90 #ifdef LWLOCK_STATS
91 static int      counts_for_pid = 0;
92 static int *sh_acquire_counts;
93 static int *ex_acquire_counts;
94 static int *block_counts;
95 #endif
96
97 #ifdef LOCK_DEBUG
98 bool            Trace_lwlocks = false;
99
100 inline static void
101 PRINT_LWDEBUG(const char *where, LWLockId lockid, const volatile LWLock *lock)
102 {
103         if (Trace_lwlocks)
104                 elog(LOG, "%s(%d): excl %d shared %d head %p rOK %d",
105                          where, (int) lockid,
106                          (int) lock->exclusive, lock->shared, lock->head,
107                          (int) lock->releaseOK);
108 }
109
110 inline static void
111 LOG_LWDEBUG(const char *where, LWLockId lockid, const char *msg)
112 {
113         if (Trace_lwlocks)
114                 elog(LOG, "%s(%d): %s", where, (int) lockid, msg);
115 }
116 #else                                                   /* not LOCK_DEBUG */
117 #define PRINT_LWDEBUG(a,b,c)
118 #define LOG_LWDEBUG(a,b,c)
119 #endif   /* LOCK_DEBUG */
120
121 #ifdef LWLOCK_STATS
122
123 static void
124 print_lwlock_stats(int code, Datum arg)
125 {
126         int                     i;
127         int                *LWLockCounter = (int *) ((char *) LWLockArray - 2 * sizeof(int));
128         int                     numLocks = LWLockCounter[1];
129
130         /* Grab an LWLock to keep different backends from mixing reports */
131         LWLockAcquire(0, LW_EXCLUSIVE);
132
133         for (i = 0; i < numLocks; i++)
134         {
135                 if (sh_acquire_counts[i] || ex_acquire_counts[i] || block_counts[i])
136                         fprintf(stderr, "PID %d lwlock %d: shacq %u exacq %u blk %u\n",
137                                         MyProcPid, i, sh_acquire_counts[i], ex_acquire_counts[i],
138                                         block_counts[i]);
139         }
140
141         LWLockRelease(0);
142 }
143 #endif   /* LWLOCK_STATS */
144
145
146 /*
147  * Compute number of LWLocks to allocate.
148  */
149 int
150 NumLWLocks(void)
151 {
152         int                     numLocks;
153
154         /*
155          * Possibly this logic should be spread out among the affected modules,
156          * the same way that shmem space estimation is done.  But for now, there
157          * are few enough users of LWLocks that we can get away with just keeping
158          * the knowledge here.
159          */
160
161         /* Predefined LWLocks */
162         numLocks = (int) NumFixedLWLocks;
163
164         /* bufmgr.c needs two for each shared buffer */
165         numLocks += 2 * NBuffers;
166
167         /* clog.c needs one per CLOG buffer */
168         numLocks += NUM_CLOG_BUFFERS;
169
170         /* subtrans.c needs one per SubTrans buffer */
171         numLocks += NUM_SUBTRANS_BUFFERS;
172
173         /* multixact.c needs two SLRU areas */
174         numLocks += NUM_MXACTOFFSET_BUFFERS + NUM_MXACTMEMBER_BUFFERS;
175
176         /*
177          * Add any requested by loadable modules; for backwards-compatibility
178          * reasons, allocate at least NUM_USER_DEFINED_LWLOCKS of them even if
179          * there are no explicit requests.
180          */
181         lock_addin_request_allowed = false;
182         numLocks += Max(lock_addin_request, NUM_USER_DEFINED_LWLOCKS);
183
184         return numLocks;
185 }
186
187
188 /*
189  * RequestAddinLWLocks
190  *              Request that extra LWLocks be allocated for use by
191  *              a loadable module.
192  *
193  * This is only useful if called from the _PG_init hook of a library that
194  * is loaded into the postmaster via shared_preload_libraries.  Once
195  * shared memory has been allocated, calls will be ignored.  (We could
196  * raise an error, but it seems better to make it a no-op, so that
197  * libraries containing such calls can be reloaded if needed.)
198  */
199 void
200 RequestAddinLWLocks(int n)
201 {
202         if (IsUnderPostmaster || !lock_addin_request_allowed)
203                 return;                                 /* too late */
204         lock_addin_request += n;
205 }
206
207
208 /*
209  * Compute shmem space needed for LWLocks.
210  */
211 Size
212 LWLockShmemSize(void)
213 {
214         Size            size;
215         int                     numLocks = NumLWLocks();
216
217         /* Space for the LWLock array. */
218         size = mul_size(numLocks, sizeof(LWLockPadded));
219
220         /* Space for dynamic allocation counter, plus room for alignment. */
221         size = add_size(size, 2 * sizeof(int) + LWLOCK_PADDED_SIZE);
222
223         return size;
224 }
225
226
227 /*
228  * Allocate shmem space for LWLocks and initialize the locks.
229  */
230 void
231 CreateLWLocks(void)
232 {
233         int                     numLocks = NumLWLocks();
234         Size            spaceLocks = LWLockShmemSize();
235         LWLockPadded *lock;
236         int                *LWLockCounter;
237         char       *ptr;
238         int                     id;
239
240         /* Allocate space */
241         ptr = (char *) ShmemAlloc(spaceLocks);
242
243         /* Leave room for dynamic allocation counter */
244         ptr += 2 * sizeof(int);
245
246         /* Ensure desired alignment of LWLock array */
247         ptr += LWLOCK_PADDED_SIZE - ((unsigned long) ptr) % LWLOCK_PADDED_SIZE;
248
249         LWLockArray = (LWLockPadded *) ptr;
250
251         /*
252          * Initialize all LWLocks to "unlocked" state
253          */
254         for (id = 0, lock = LWLockArray; id < numLocks; id++, lock++)
255         {
256                 SpinLockInit(&lock->lock.mutex);
257                 lock->lock.releaseOK = true;
258                 lock->lock.exclusive = 0;
259                 lock->lock.shared = 0;
260                 lock->lock.head = NULL;
261                 lock->lock.tail = NULL;
262         }
263
264         /*
265          * Initialize the dynamic-allocation counter, which is stored just before
266          * the first LWLock.
267          */
268         LWLockCounter = (int *) ((char *) LWLockArray - 2 * sizeof(int));
269         LWLockCounter[0] = (int) NumFixedLWLocks;
270         LWLockCounter[1] = numLocks;
271 }
272
273
274 /*
275  * LWLockAssign - assign a dynamically-allocated LWLock number
276  *
277  * We interlock this using the same spinlock that is used to protect
278  * ShmemAlloc().  Interlocking is not really necessary during postmaster
279  * startup, but it is needed if any user-defined code tries to allocate
280  * LWLocks after startup.
281  */
282 LWLockId
283 LWLockAssign(void)
284 {
285         LWLockId        result;
286
287         /* use volatile pointer to prevent code rearrangement */
288         volatile int *LWLockCounter;
289
290         LWLockCounter = (int *) ((char *) LWLockArray - 2 * sizeof(int));
291         SpinLockAcquire(ShmemLock);
292         if (LWLockCounter[0] >= LWLockCounter[1])
293         {
294                 SpinLockRelease(ShmemLock);
295                 elog(ERROR, "no more LWLockIds available");
296         }
297         result = (LWLockId) (LWLockCounter[0]++);
298         SpinLockRelease(ShmemLock);
299         return result;
300 }
301
302
303 /*
304  * LWLockAcquire - acquire a lightweight lock in the specified mode
305  *
306  * If the lock is not available, sleep until it is.
307  *
308  * Side effect: cancel/die interrupts are held off until lock release.
309  */
310 void
311 LWLockAcquire(LWLockId lockid, LWLockMode mode)
312 {
313         volatile LWLock *lock = &(LWLockArray[lockid].lock);
314         PGPROC     *proc = MyProc;
315         bool            retry = false;
316         int                     extraWaits = 0;
317
318         PRINT_LWDEBUG("LWLockAcquire", lockid, lock);
319
320 #ifdef LWLOCK_STATS
321         /* Set up local count state first time through in a given process */
322         if (counts_for_pid != MyProcPid)
323         {
324                 int                *LWLockCounter = (int *) ((char *) LWLockArray - 2 * sizeof(int));
325                 int                     numLocks = LWLockCounter[1];
326
327                 sh_acquire_counts = calloc(numLocks, sizeof(int));
328                 ex_acquire_counts = calloc(numLocks, sizeof(int));
329                 block_counts = calloc(numLocks, sizeof(int));
330                 counts_for_pid = MyProcPid;
331                 on_shmem_exit(print_lwlock_stats, 0);
332         }
333         /* Count lock acquisition attempts */
334         if (mode == LW_EXCLUSIVE)
335                 ex_acquire_counts[lockid]++;
336         else
337                 sh_acquire_counts[lockid]++;
338 #endif   /* LWLOCK_STATS */
339
340         /*
341          * We can't wait if we haven't got a PGPROC.  This should only occur
342          * during bootstrap or shared memory initialization.  Put an Assert here
343          * to catch unsafe coding practices.
344          */
345         Assert(!(proc == NULL && IsUnderPostmaster));
346
347         /* Ensure we will have room to remember the lock */
348         if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
349                 elog(ERROR, "too many LWLocks taken");
350
351         /*
352          * Lock out cancel/die interrupts until we exit the code section protected
353          * by the LWLock.  This ensures that interrupts will not interfere with
354          * manipulations of data structures in shared memory.
355          */
356         HOLD_INTERRUPTS();
357
358         /*
359          * Loop here to try to acquire lock after each time we are signaled by
360          * LWLockRelease.
361          *
362          * NOTE: it might seem better to have LWLockRelease actually grant us the
363          * lock, rather than retrying and possibly having to go back to sleep. But
364          * in practice that is no good because it means a process swap for every
365          * lock acquisition when two or more processes are contending for the same
366          * lock.  Since LWLocks are normally used to protect not-very-long
367          * sections of computation, a process needs to be able to acquire and
368          * release the same lock many times during a single CPU time slice, even
369          * in the presence of contention.  The efficiency of being able to do that
370          * outweighs the inefficiency of sometimes wasting a process dispatch
371          * cycle because the lock is not free when a released waiter finally gets
372          * to run.      See pgsql-hackers archives for 29-Dec-01.
373          */
374         for (;;)
375         {
376                 bool            mustwait;
377
378                 /* Acquire mutex.  Time spent holding mutex should be short! */
379                 SpinLockAcquire(&lock->mutex);
380
381                 /* If retrying, allow LWLockRelease to release waiters again */
382                 if (retry)
383                         lock->releaseOK = true;
384
385                 /* If I can get the lock, do so quickly. */
386                 if (mode == LW_EXCLUSIVE)
387                 {
388                         if (lock->exclusive == 0 && lock->shared == 0)
389                         {
390                                 lock->exclusive++;
391                                 mustwait = false;
392                         }
393                         else
394                                 mustwait = true;
395                 }
396                 else
397                 {
398                         if (lock->exclusive == 0)
399                         {
400                                 lock->shared++;
401                                 mustwait = false;
402                         }
403                         else
404                                 mustwait = true;
405                 }
406
407                 if (!mustwait)
408                         break;                          /* got the lock */
409
410                 /*
411                  * Add myself to wait queue.
412                  *
413                  * If we don't have a PGPROC structure, there's no way to wait. This
414                  * should never occur, since MyProc should only be null during shared
415                  * memory initialization.
416                  */
417                 if (proc == NULL)
418                         elog(PANIC, "cannot wait without a PGPROC structure");
419
420                 proc->lwWaiting = true;
421                 proc->lwExclusive = (mode == LW_EXCLUSIVE);
422                 proc->lwWaitLink = NULL;
423                 if (lock->head == NULL)
424                         lock->head = proc;
425                 else
426                         lock->tail->lwWaitLink = proc;
427                 lock->tail = proc;
428
429                 /* Can release the mutex now */
430                 SpinLockRelease(&lock->mutex);
431
432                 /*
433                  * Wait until awakened.
434                  *
435                  * Since we share the process wait semaphore with the regular lock
436                  * manager and ProcWaitForSignal, and we may need to acquire an LWLock
437                  * while one of those is pending, it is possible that we get awakened
438                  * for a reason other than being signaled by LWLockRelease. If so,
439                  * loop back and wait again.  Once we've gotten the LWLock,
440                  * re-increment the sema by the number of additional signals received,
441                  * so that the lock manager or signal manager will see the received
442                  * signal when it next waits.
443                  */
444                 LOG_LWDEBUG("LWLockAcquire", lockid, "waiting");
445
446 #ifdef LWLOCK_STATS
447                 block_counts[lockid]++;
448 #endif
449
450                 PG_TRACE2(lwlock__startwait, lockid, mode);
451
452                 for (;;)
453                 {
454                         /* "false" means cannot accept cancel/die interrupt here. */
455                         PGSemaphoreLock(&proc->sem, false);
456                         if (!proc->lwWaiting)
457                                 break;
458                         extraWaits++;
459                 }
460
461                 PG_TRACE2(lwlock__endwait, lockid, mode);
462
463                 LOG_LWDEBUG("LWLockAcquire", lockid, "awakened");
464
465                 /* Now loop back and try to acquire lock again. */
466                 retry = true;
467         }
468
469         /* We are done updating shared state of the lock itself. */
470         SpinLockRelease(&lock->mutex);
471
472         PG_TRACE2(lwlock__acquire, lockid, mode);
473
474         /* Add lock to list of locks held by this backend */
475         held_lwlocks[num_held_lwlocks++] = lockid;
476
477         /*
478          * Fix the process wait semaphore's count for any absorbed wakeups.
479          */
480         while (extraWaits-- > 0)
481                 PGSemaphoreUnlock(&proc->sem);
482 }
483
484 /*
485  * LWLockConditionalAcquire - acquire a lightweight lock in the specified mode
486  *
487  * If the lock is not available, return FALSE with no side-effects.
488  *
489  * If successful, cancel/die interrupts are held off until lock release.
490  */
491 bool
492 LWLockConditionalAcquire(LWLockId lockid, LWLockMode mode)
493 {
494         volatile LWLock *lock = &(LWLockArray[lockid].lock);
495         bool            mustwait;
496
497         PRINT_LWDEBUG("LWLockConditionalAcquire", lockid, lock);
498
499         /* Ensure we will have room to remember the lock */
500         if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
501                 elog(ERROR, "too many LWLocks taken");
502
503         /*
504          * Lock out cancel/die interrupts until we exit the code section protected
505          * by the LWLock.  This ensures that interrupts will not interfere with
506          * manipulations of data structures in shared memory.
507          */
508         HOLD_INTERRUPTS();
509
510         /* Acquire mutex.  Time spent holding mutex should be short! */
511         SpinLockAcquire(&lock->mutex);
512
513         /* If I can get the lock, do so quickly. */
514         if (mode == LW_EXCLUSIVE)
515         {
516                 if (lock->exclusive == 0 && lock->shared == 0)
517                 {
518                         lock->exclusive++;
519                         mustwait = false;
520                 }
521                 else
522                         mustwait = true;
523         }
524         else
525         {
526                 if (lock->exclusive == 0)
527                 {
528                         lock->shared++;
529                         mustwait = false;
530                 }
531                 else
532                         mustwait = true;
533         }
534
535         /* We are done updating shared state of the lock itself. */
536         SpinLockRelease(&lock->mutex);
537
538         if (mustwait)
539         {
540                 /* Failed to get lock, so release interrupt holdoff */
541                 RESUME_INTERRUPTS();
542                 LOG_LWDEBUG("LWLockConditionalAcquire", lockid, "failed");
543                 PG_TRACE2(lwlock__condacquire__fail, lockid, mode);
544         }
545         else
546         {
547                 /* Add lock to list of locks held by this backend */
548                 held_lwlocks[num_held_lwlocks++] = lockid;
549                 PG_TRACE2(lwlock__condacquire, lockid, mode);
550         }
551
552         return !mustwait;
553 }
554
555 /*
556  * LWLockRelease - release a previously acquired lock
557  */
558 void
559 LWLockRelease(LWLockId lockid)
560 {
561         volatile LWLock *lock = &(LWLockArray[lockid].lock);
562         PGPROC     *head;
563         PGPROC     *proc;
564         int                     i;
565
566         PRINT_LWDEBUG("LWLockRelease", lockid, lock);
567
568         /*
569          * Remove lock from list of locks held.  Usually, but not always, it will
570          * be the latest-acquired lock; so search array backwards.
571          */
572         for (i = num_held_lwlocks; --i >= 0;)
573         {
574                 if (lockid == held_lwlocks[i])
575                         break;
576         }
577         if (i < 0)
578                 elog(ERROR, "lock %d is not held", (int) lockid);
579         num_held_lwlocks--;
580         for (; i < num_held_lwlocks; i++)
581                 held_lwlocks[i] = held_lwlocks[i + 1];
582
583         /* Acquire mutex.  Time spent holding mutex should be short! */
584         SpinLockAcquire(&lock->mutex);
585
586         /* Release my hold on lock */
587         if (lock->exclusive > 0)
588                 lock->exclusive--;
589         else
590         {
591                 Assert(lock->shared > 0);
592                 lock->shared--;
593         }
594
595         /*
596          * See if I need to awaken any waiters.  If I released a non-last shared
597          * hold, there cannot be anything to do.  Also, do not awaken any waiters
598          * if someone has already awakened waiters that haven't yet acquired the
599          * lock.
600          */
601         head = lock->head;
602         if (head != NULL)
603         {
604                 if (lock->exclusive == 0 && lock->shared == 0 && lock->releaseOK)
605                 {
606                         /*
607                          * Remove the to-be-awakened PGPROCs from the queue.  If the front
608                          * waiter wants exclusive lock, awaken him only. Otherwise awaken
609                          * as many waiters as want shared access.
610                          */
611                         proc = head;
612                         if (!proc->lwExclusive)
613                         {
614                                 while (proc->lwWaitLink != NULL &&
615                                            !proc->lwWaitLink->lwExclusive)
616                                         proc = proc->lwWaitLink;
617                         }
618                         /* proc is now the last PGPROC to be released */
619                         lock->head = proc->lwWaitLink;
620                         proc->lwWaitLink = NULL;
621                         /* prevent additional wakeups until retryer gets to run */
622                         lock->releaseOK = false;
623                 }
624                 else
625                 {
626                         /* lock is still held, can't awaken anything */
627                         head = NULL;
628                 }
629         }
630
631         /* We are done updating shared state of the lock itself. */
632         SpinLockRelease(&lock->mutex);
633
634         PG_TRACE1(lwlock__release, lockid);
635
636         /*
637          * Awaken any waiters I removed from the queue.
638          */
639         while (head != NULL)
640         {
641                 LOG_LWDEBUG("LWLockRelease", lockid, "release waiter");
642                 proc = head;
643                 head = proc->lwWaitLink;
644                 proc->lwWaitLink = NULL;
645                 proc->lwWaiting = false;
646                 PGSemaphoreUnlock(&proc->sem);
647         }
648
649         /*
650          * Now okay to allow cancel/die interrupts.
651          */
652         RESUME_INTERRUPTS();
653 }
654
655
656 /*
657  * LWLockReleaseAll - release all currently-held locks
658  *
659  * Used to clean up after ereport(ERROR). An important difference between this
660  * function and retail LWLockRelease calls is that InterruptHoldoffCount is
661  * unchanged by this operation.  This is necessary since InterruptHoldoffCount
662  * has been set to an appropriate level earlier in error recovery. We could
663  * decrement it below zero if we allow it to drop for each released lock!
664  */
665 void
666 LWLockReleaseAll(void)
667 {
668         while (num_held_lwlocks > 0)
669         {
670                 HOLD_INTERRUPTS();              /* match the upcoming RESUME_INTERRUPTS */
671
672                 LWLockRelease(held_lwlocks[num_held_lwlocks - 1]);
673         }
674 }
675
676
677 /*
678  * LWLockHeldByMe - test whether my process currently holds a lock
679  *
680  * This is meant as debug support only.  We do not distinguish whether the
681  * lock is held shared or exclusive.
682  */
683 bool
684 LWLockHeldByMe(LWLockId lockid)
685 {
686         int                     i;
687
688         for (i = 0; i < num_held_lwlocks; i++)
689         {
690                 if (held_lwlocks[i] == lockid)
691                         return true;
692         }
693         return false;
694 }