]> granicus.if.org Git - postgresql/blob - src/backend/storage/lmgr/lwlock.c
Update copyright for 2009.
[postgresql] / src / backend / storage / lmgr / lwlock.c
1 /*-------------------------------------------------------------------------
2  *
3  * lwlock.c
4  *        Lightweight lock manager
5  *
6  * Lightweight locks are intended primarily to provide mutual exclusion of
7  * access to shared-memory data structures.  Therefore, they offer both
8  * exclusive and shared lock modes (to support read/write and read-only
9  * access to a shared object).  There are few other frammishes.  User-level
10  * locking should be done with the full lock manager --- which depends on
11  * LWLocks to protect its shared state.
12  *
13  *
14  * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
15  * Portions Copyright (c) 1994, Regents of the University of California
16  *
17  * IDENTIFICATION
18  *        $PostgreSQL: pgsql/src/backend/storage/lmgr/lwlock.c,v 1.53 2009/01/01 17:23:48 momjian Exp $
19  *
20  *-------------------------------------------------------------------------
21  */
22 #include "postgres.h"
23
24 #include "access/clog.h"
25 #include "access/multixact.h"
26 #include "access/subtrans.h"
27 #include "miscadmin.h"
28 #include "pg_trace.h"
29 #include "storage/ipc.h"
30 #include "storage/proc.h"
31 #include "storage/spin.h"
32
33
34 /* We use the ShmemLock spinlock to protect LWLockAssign */
35 extern slock_t *ShmemLock;
36
37
38 typedef struct LWLock
39 {
40         slock_t         mutex;                  /* Protects LWLock and queue of PGPROCs */
41         bool            releaseOK;              /* T if ok to release waiters */
42         char            exclusive;              /* # of exclusive holders (0 or 1) */
43         int                     shared;                 /* # of shared holders (0..MaxBackends) */
44         PGPROC     *head;                       /* head of list of waiting PGPROCs */
45         PGPROC     *tail;                       /* tail of list of waiting PGPROCs */
46         /* tail is undefined when head is NULL */
47 } LWLock;
48
49 /*
50  * All the LWLock structs are allocated as an array in shared memory.
51  * (LWLockIds are indexes into the array.)      We force the array stride to
52  * be a power of 2, which saves a few cycles in indexing, but more
53  * importantly also ensures that individual LWLocks don't cross cache line
54  * boundaries.  This reduces cache contention problems, especially on AMD
55  * Opterons.  (Of course, we have to also ensure that the array start
56  * address is suitably aligned.)
57  *
58  * LWLock is between 16 and 32 bytes on all known platforms, so these two
59  * cases are sufficient.
60  */
61 #define LWLOCK_PADDED_SIZE      (sizeof(LWLock) <= 16 ? 16 : 32)
62
63 typedef union LWLockPadded
64 {
65         LWLock          lock;
66         char            pad[LWLOCK_PADDED_SIZE];
67 } LWLockPadded;
68
69 /*
70  * This points to the array of LWLocks in shared memory.  Backends inherit
71  * the pointer by fork from the postmaster (except in the EXEC_BACKEND case,
72  * where we have special measures to pass it down).
73  */
74 NON_EXEC_STATIC LWLockPadded *LWLockArray = NULL;
75
76
77 /*
78  * We use this structure to keep track of locked LWLocks for release
79  * during error recovery.  The maximum size could be determined at runtime
80  * if necessary, but it seems unlikely that more than a few locks could
81  * ever be held simultaneously.
82  */
83 #define MAX_SIMUL_LWLOCKS       100
84
85 static int      num_held_lwlocks = 0;
86 static LWLockId held_lwlocks[MAX_SIMUL_LWLOCKS];
87
88 static int      lock_addin_request = 0;
89 static bool lock_addin_request_allowed = true;
90
91 #ifdef LWLOCK_STATS
92 static int      counts_for_pid = 0;
93 static int *sh_acquire_counts;
94 static int *ex_acquire_counts;
95 static int *block_counts;
96 #endif
97
98 #ifdef LOCK_DEBUG
99 bool            Trace_lwlocks = false;
100
101 inline static void
102 PRINT_LWDEBUG(const char *where, LWLockId lockid, const volatile LWLock *lock)
103 {
104         if (Trace_lwlocks)
105                 elog(LOG, "%s(%d): excl %d shared %d head %p rOK %d",
106                          where, (int) lockid,
107                          (int) lock->exclusive, lock->shared, lock->head,
108                          (int) lock->releaseOK);
109 }
110
111 inline static void
112 LOG_LWDEBUG(const char *where, LWLockId lockid, const char *msg)
113 {
114         if (Trace_lwlocks)
115                 elog(LOG, "%s(%d): %s", where, (int) lockid, msg);
116 }
117 #else                                                   /* not LOCK_DEBUG */
118 #define PRINT_LWDEBUG(a,b,c)
119 #define LOG_LWDEBUG(a,b,c)
120 #endif   /* LOCK_DEBUG */
121
122 #ifdef LWLOCK_STATS
123
124 static void
125 print_lwlock_stats(int code, Datum arg)
126 {
127         int                     i;
128         int                *LWLockCounter = (int *) ((char *) LWLockArray - 2 * sizeof(int));
129         int                     numLocks = LWLockCounter[1];
130
131         /* Grab an LWLock to keep different backends from mixing reports */
132         LWLockAcquire(0, LW_EXCLUSIVE);
133
134         for (i = 0; i < numLocks; i++)
135         {
136                 if (sh_acquire_counts[i] || ex_acquire_counts[i] || block_counts[i])
137                         fprintf(stderr, "PID %d lwlock %d: shacq %u exacq %u blk %u\n",
138                                         MyProcPid, i, sh_acquire_counts[i], ex_acquire_counts[i],
139                                         block_counts[i]);
140         }
141
142         LWLockRelease(0);
143 }
144 #endif   /* LWLOCK_STATS */
145
146
147 /*
148  * Compute number of LWLocks to allocate.
149  */
150 int
151 NumLWLocks(void)
152 {
153         int                     numLocks;
154
155         /*
156          * Possibly this logic should be spread out among the affected modules,
157          * the same way that shmem space estimation is done.  But for now, there
158          * are few enough users of LWLocks that we can get away with just keeping
159          * the knowledge here.
160          */
161
162         /* Predefined LWLocks */
163         numLocks = (int) NumFixedLWLocks;
164
165         /* bufmgr.c needs two for each shared buffer */
166         numLocks += 2 * NBuffers;
167
168         /* clog.c needs one per CLOG buffer */
169         numLocks += NUM_CLOG_BUFFERS;
170
171         /* subtrans.c needs one per SubTrans buffer */
172         numLocks += NUM_SUBTRANS_BUFFERS;
173
174         /* multixact.c needs two SLRU areas */
175         numLocks += NUM_MXACTOFFSET_BUFFERS + NUM_MXACTMEMBER_BUFFERS;
176
177         /*
178          * Add any requested by loadable modules; for backwards-compatibility
179          * reasons, allocate at least NUM_USER_DEFINED_LWLOCKS of them even if
180          * there are no explicit requests.
181          */
182         lock_addin_request_allowed = false;
183         numLocks += Max(lock_addin_request, NUM_USER_DEFINED_LWLOCKS);
184
185         return numLocks;
186 }
187
188
189 /*
190  * RequestAddinLWLocks
191  *              Request that extra LWLocks be allocated for use by
192  *              a loadable module.
193  *
194  * This is only useful if called from the _PG_init hook of a library that
195  * is loaded into the postmaster via shared_preload_libraries.  Once
196  * shared memory has been allocated, calls will be ignored.  (We could
197  * raise an error, but it seems better to make it a no-op, so that
198  * libraries containing such calls can be reloaded if needed.)
199  */
200 void
201 RequestAddinLWLocks(int n)
202 {
203         if (IsUnderPostmaster || !lock_addin_request_allowed)
204                 return;                                 /* too late */
205         lock_addin_request += n;
206 }
207
208
209 /*
210  * Compute shmem space needed for LWLocks.
211  */
212 Size
213 LWLockShmemSize(void)
214 {
215         Size            size;
216         int                     numLocks = NumLWLocks();
217
218         /* Space for the LWLock array. */
219         size = mul_size(numLocks, sizeof(LWLockPadded));
220
221         /* Space for dynamic allocation counter, plus room for alignment. */
222         size = add_size(size, 2 * sizeof(int) + LWLOCK_PADDED_SIZE);
223
224         return size;
225 }
226
227
228 /*
229  * Allocate shmem space for LWLocks and initialize the locks.
230  */
231 void
232 CreateLWLocks(void)
233 {
234         int                     numLocks = NumLWLocks();
235         Size            spaceLocks = LWLockShmemSize();
236         LWLockPadded *lock;
237         int                *LWLockCounter;
238         char       *ptr;
239         int                     id;
240
241         /* Allocate space */
242         ptr = (char *) ShmemAlloc(spaceLocks);
243
244         /* Leave room for dynamic allocation counter */
245         ptr += 2 * sizeof(int);
246
247         /* Ensure desired alignment of LWLock array */
248         ptr += LWLOCK_PADDED_SIZE - ((unsigned long) ptr) % LWLOCK_PADDED_SIZE;
249
250         LWLockArray = (LWLockPadded *) ptr;
251
252         /*
253          * Initialize all LWLocks to "unlocked" state
254          */
255         for (id = 0, lock = LWLockArray; id < numLocks; id++, lock++)
256         {
257                 SpinLockInit(&lock->lock.mutex);
258                 lock->lock.releaseOK = true;
259                 lock->lock.exclusive = 0;
260                 lock->lock.shared = 0;
261                 lock->lock.head = NULL;
262                 lock->lock.tail = NULL;
263         }
264
265         /*
266          * Initialize the dynamic-allocation counter, which is stored just before
267          * the first LWLock.
268          */
269         LWLockCounter = (int *) ((char *) LWLockArray - 2 * sizeof(int));
270         LWLockCounter[0] = (int) NumFixedLWLocks;
271         LWLockCounter[1] = numLocks;
272 }
273
274
275 /*
276  * LWLockAssign - assign a dynamically-allocated LWLock number
277  *
278  * We interlock this using the same spinlock that is used to protect
279  * ShmemAlloc().  Interlocking is not really necessary during postmaster
280  * startup, but it is needed if any user-defined code tries to allocate
281  * LWLocks after startup.
282  */
283 LWLockId
284 LWLockAssign(void)
285 {
286         LWLockId        result;
287
288         /* use volatile pointer to prevent code rearrangement */
289         volatile int *LWLockCounter;
290
291         LWLockCounter = (int *) ((char *) LWLockArray - 2 * sizeof(int));
292         SpinLockAcquire(ShmemLock);
293         if (LWLockCounter[0] >= LWLockCounter[1])
294         {
295                 SpinLockRelease(ShmemLock);
296                 elog(ERROR, "no more LWLockIds available");
297         }
298         result = (LWLockId) (LWLockCounter[0]++);
299         SpinLockRelease(ShmemLock);
300         return result;
301 }
302
303
304 /*
305  * LWLockAcquire - acquire a lightweight lock in the specified mode
306  *
307  * If the lock is not available, sleep until it is.
308  *
309  * Side effect: cancel/die interrupts are held off until lock release.
310  */
311 void
312 LWLockAcquire(LWLockId lockid, LWLockMode mode)
313 {
314         volatile LWLock *lock = &(LWLockArray[lockid].lock);
315         PGPROC     *proc = MyProc;
316         bool            retry = false;
317         int                     extraWaits = 0;
318
319         PRINT_LWDEBUG("LWLockAcquire", lockid, lock);
320
321 #ifdef LWLOCK_STATS
322         /* Set up local count state first time through in a given process */
323         if (counts_for_pid != MyProcPid)
324         {
325                 int                *LWLockCounter = (int *) ((char *) LWLockArray - 2 * sizeof(int));
326                 int                     numLocks = LWLockCounter[1];
327
328                 sh_acquire_counts = calloc(numLocks, sizeof(int));
329                 ex_acquire_counts = calloc(numLocks, sizeof(int));
330                 block_counts = calloc(numLocks, sizeof(int));
331                 counts_for_pid = MyProcPid;
332                 on_shmem_exit(print_lwlock_stats, 0);
333         }
334         /* Count lock acquisition attempts */
335         if (mode == LW_EXCLUSIVE)
336                 ex_acquire_counts[lockid]++;
337         else
338                 sh_acquire_counts[lockid]++;
339 #endif   /* LWLOCK_STATS */
340
341         /*
342          * We can't wait if we haven't got a PGPROC.  This should only occur
343          * during bootstrap or shared memory initialization.  Put an Assert here
344          * to catch unsafe coding practices.
345          */
346         Assert(!(proc == NULL && IsUnderPostmaster));
347
348         /* Ensure we will have room to remember the lock */
349         if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
350                 elog(ERROR, "too many LWLocks taken");
351
352         /*
353          * Lock out cancel/die interrupts until we exit the code section protected
354          * by the LWLock.  This ensures that interrupts will not interfere with
355          * manipulations of data structures in shared memory.
356          */
357         HOLD_INTERRUPTS();
358
359         /*
360          * Loop here to try to acquire lock after each time we are signaled by
361          * LWLockRelease.
362          *
363          * NOTE: it might seem better to have LWLockRelease actually grant us the
364          * lock, rather than retrying and possibly having to go back to sleep. But
365          * in practice that is no good because it means a process swap for every
366          * lock acquisition when two or more processes are contending for the same
367          * lock.  Since LWLocks are normally used to protect not-very-long
368          * sections of computation, a process needs to be able to acquire and
369          * release the same lock many times during a single CPU time slice, even
370          * in the presence of contention.  The efficiency of being able to do that
371          * outweighs the inefficiency of sometimes wasting a process dispatch
372          * cycle because the lock is not free when a released waiter finally gets
373          * to run.      See pgsql-hackers archives for 29-Dec-01.
374          */
375         for (;;)
376         {
377                 bool            mustwait;
378
379                 /* Acquire mutex.  Time spent holding mutex should be short! */
380                 SpinLockAcquire(&lock->mutex);
381
382                 /* If retrying, allow LWLockRelease to release waiters again */
383                 if (retry)
384                         lock->releaseOK = true;
385
386                 /* If I can get the lock, do so quickly. */
387                 if (mode == LW_EXCLUSIVE)
388                 {
389                         if (lock->exclusive == 0 && lock->shared == 0)
390                         {
391                                 lock->exclusive++;
392                                 mustwait = false;
393                         }
394                         else
395                                 mustwait = true;
396                 }
397                 else
398                 {
399                         if (lock->exclusive == 0)
400                         {
401                                 lock->shared++;
402                                 mustwait = false;
403                         }
404                         else
405                                 mustwait = true;
406                 }
407
408                 if (!mustwait)
409                         break;                          /* got the lock */
410
411                 /*
412                  * Add myself to wait queue.
413                  *
414                  * If we don't have a PGPROC structure, there's no way to wait. This
415                  * should never occur, since MyProc should only be null during shared
416                  * memory initialization.
417                  */
418                 if (proc == NULL)
419                         elog(PANIC, "cannot wait without a PGPROC structure");
420
421                 proc->lwWaiting = true;
422                 proc->lwExclusive = (mode == LW_EXCLUSIVE);
423                 proc->lwWaitLink = NULL;
424                 if (lock->head == NULL)
425                         lock->head = proc;
426                 else
427                         lock->tail->lwWaitLink = proc;
428                 lock->tail = proc;
429
430                 /* Can release the mutex now */
431                 SpinLockRelease(&lock->mutex);
432
433                 /*
434                  * Wait until awakened.
435                  *
436                  * Since we share the process wait semaphore with the regular lock
437                  * manager and ProcWaitForSignal, and we may need to acquire an LWLock
438                  * while one of those is pending, it is possible that we get awakened
439                  * for a reason other than being signaled by LWLockRelease. If so,
440                  * loop back and wait again.  Once we've gotten the LWLock,
441                  * re-increment the sema by the number of additional signals received,
442                  * so that the lock manager or signal manager will see the received
443                  * signal when it next waits.
444                  */
445                 LOG_LWDEBUG("LWLockAcquire", lockid, "waiting");
446
447 #ifdef LWLOCK_STATS
448                 block_counts[lockid]++;
449 #endif
450
451                 TRACE_POSTGRESQL_LWLOCK_WAIT_START(lockid, mode);
452
453                 for (;;)
454                 {
455                         /* "false" means cannot accept cancel/die interrupt here. */
456                         PGSemaphoreLock(&proc->sem, false);
457                         if (!proc->lwWaiting)
458                                 break;
459                         extraWaits++;
460                 }
461
462                 TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(lockid, mode);
463
464                 LOG_LWDEBUG("LWLockAcquire", lockid, "awakened");
465
466                 /* Now loop back and try to acquire lock again. */
467                 retry = true;
468         }
469
470         /* We are done updating shared state of the lock itself. */
471         SpinLockRelease(&lock->mutex);
472
473         TRACE_POSTGRESQL_LWLOCK_ACQUIRE(lockid, mode);
474
475         /* Add lock to list of locks held by this backend */
476         held_lwlocks[num_held_lwlocks++] = lockid;
477
478         /*
479          * Fix the process wait semaphore's count for any absorbed wakeups.
480          */
481         while (extraWaits-- > 0)
482                 PGSemaphoreUnlock(&proc->sem);
483 }
484
485 /*
486  * LWLockConditionalAcquire - acquire a lightweight lock in the specified mode
487  *
488  * If the lock is not available, return FALSE with no side-effects.
489  *
490  * If successful, cancel/die interrupts are held off until lock release.
491  */
492 bool
493 LWLockConditionalAcquire(LWLockId lockid, LWLockMode mode)
494 {
495         volatile LWLock *lock = &(LWLockArray[lockid].lock);
496         bool            mustwait;
497
498         PRINT_LWDEBUG("LWLockConditionalAcquire", lockid, lock);
499
500         /* Ensure we will have room to remember the lock */
501         if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
502                 elog(ERROR, "too many LWLocks taken");
503
504         /*
505          * Lock out cancel/die interrupts until we exit the code section protected
506          * by the LWLock.  This ensures that interrupts will not interfere with
507          * manipulations of data structures in shared memory.
508          */
509         HOLD_INTERRUPTS();
510
511         /* Acquire mutex.  Time spent holding mutex should be short! */
512         SpinLockAcquire(&lock->mutex);
513
514         /* If I can get the lock, do so quickly. */
515         if (mode == LW_EXCLUSIVE)
516         {
517                 if (lock->exclusive == 0 && lock->shared == 0)
518                 {
519                         lock->exclusive++;
520                         mustwait = false;
521                 }
522                 else
523                         mustwait = true;
524         }
525         else
526         {
527                 if (lock->exclusive == 0)
528                 {
529                         lock->shared++;
530                         mustwait = false;
531                 }
532                 else
533                         mustwait = true;
534         }
535
536         /* We are done updating shared state of the lock itself. */
537         SpinLockRelease(&lock->mutex);
538
539         if (mustwait)
540         {
541                 /* Failed to get lock, so release interrupt holdoff */
542                 RESUME_INTERRUPTS();
543                 LOG_LWDEBUG("LWLockConditionalAcquire", lockid, "failed");
544                 TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE_FAIL(lockid, mode);
545         }
546         else
547         {
548                 /* Add lock to list of locks held by this backend */
549                 held_lwlocks[num_held_lwlocks++] = lockid;
550                 TRACE_POSTGRESQL_LWLOCK_CONDACQUIRE(lockid, mode);
551         }
552
553         return !mustwait;
554 }
555
556 /*
557  * LWLockRelease - release a previously acquired lock
558  */
559 void
560 LWLockRelease(LWLockId lockid)
561 {
562         volatile LWLock *lock = &(LWLockArray[lockid].lock);
563         PGPROC     *head;
564         PGPROC     *proc;
565         int                     i;
566
567         PRINT_LWDEBUG("LWLockRelease", lockid, lock);
568
569         /*
570          * Remove lock from list of locks held.  Usually, but not always, it will
571          * be the latest-acquired lock; so search array backwards.
572          */
573         for (i = num_held_lwlocks; --i >= 0;)
574         {
575                 if (lockid == held_lwlocks[i])
576                         break;
577         }
578         if (i < 0)
579                 elog(ERROR, "lock %d is not held", (int) lockid);
580         num_held_lwlocks--;
581         for (; i < num_held_lwlocks; i++)
582                 held_lwlocks[i] = held_lwlocks[i + 1];
583
584         /* Acquire mutex.  Time spent holding mutex should be short! */
585         SpinLockAcquire(&lock->mutex);
586
587         /* Release my hold on lock */
588         if (lock->exclusive > 0)
589                 lock->exclusive--;
590         else
591         {
592                 Assert(lock->shared > 0);
593                 lock->shared--;
594         }
595
596         /*
597          * See if I need to awaken any waiters.  If I released a non-last shared
598          * hold, there cannot be anything to do.  Also, do not awaken any waiters
599          * if someone has already awakened waiters that haven't yet acquired the
600          * lock.
601          */
602         head = lock->head;
603         if (head != NULL)
604         {
605                 if (lock->exclusive == 0 && lock->shared == 0 && lock->releaseOK)
606                 {
607                         /*
608                          * Remove the to-be-awakened PGPROCs from the queue.  If the front
609                          * waiter wants exclusive lock, awaken him only. Otherwise awaken
610                          * as many waiters as want shared access.
611                          */
612                         proc = head;
613                         if (!proc->lwExclusive)
614                         {
615                                 while (proc->lwWaitLink != NULL &&
616                                            !proc->lwWaitLink->lwExclusive)
617                                         proc = proc->lwWaitLink;
618                         }
619                         /* proc is now the last PGPROC to be released */
620                         lock->head = proc->lwWaitLink;
621                         proc->lwWaitLink = NULL;
622                         /* prevent additional wakeups until retryer gets to run */
623                         lock->releaseOK = false;
624                 }
625                 else
626                 {
627                         /* lock is still held, can't awaken anything */
628                         head = NULL;
629                 }
630         }
631
632         /* We are done updating shared state of the lock itself. */
633         SpinLockRelease(&lock->mutex);
634
635         TRACE_POSTGRESQL_LWLOCK_RELEASE(lockid);
636
637         /*
638          * Awaken any waiters I removed from the queue.
639          */
640         while (head != NULL)
641         {
642                 LOG_LWDEBUG("LWLockRelease", lockid, "release waiter");
643                 proc = head;
644                 head = proc->lwWaitLink;
645                 proc->lwWaitLink = NULL;
646                 proc->lwWaiting = false;
647                 PGSemaphoreUnlock(&proc->sem);
648         }
649
650         /*
651          * Now okay to allow cancel/die interrupts.
652          */
653         RESUME_INTERRUPTS();
654 }
655
656
657 /*
658  * LWLockReleaseAll - release all currently-held locks
659  *
660  * Used to clean up after ereport(ERROR). An important difference between this
661  * function and retail LWLockRelease calls is that InterruptHoldoffCount is
662  * unchanged by this operation.  This is necessary since InterruptHoldoffCount
663  * has been set to an appropriate level earlier in error recovery. We could
664  * decrement it below zero if we allow it to drop for each released lock!
665  */
666 void
667 LWLockReleaseAll(void)
668 {
669         while (num_held_lwlocks > 0)
670         {
671                 HOLD_INTERRUPTS();              /* match the upcoming RESUME_INTERRUPTS */
672
673                 LWLockRelease(held_lwlocks[num_held_lwlocks - 1]);
674         }
675 }
676
677
678 /*
679  * LWLockHeldByMe - test whether my process currently holds a lock
680  *
681  * This is meant as debug support only.  We do not distinguish whether the
682  * lock is held shared or exclusive.
683  */
684 bool
685 LWLockHeldByMe(LWLockId lockid)
686 {
687         int                     i;
688
689         for (i = 0; i < num_held_lwlocks; i++)
690         {
691                 if (held_lwlocks[i] == lockid)
692                         return true;
693         }
694         return false;
695 }