]> granicus.if.org Git - postgresql/blob - src/backend/storage/lmgr/lwlock.c
Add some optional code (conditionally compiled under #ifdef LWLOCK_STATS)
[postgresql] / src / backend / storage / lmgr / lwlock.c
1 /*-------------------------------------------------------------------------
2  *
3  * lwlock.c
4  *        Lightweight lock manager
5  *
6  * Lightweight locks are intended primarily to provide mutual exclusion of
7  * access to shared-memory data structures.  Therefore, they offer both
8  * exclusive and shared lock modes (to support read/write and read-only
9  * access to a shared object).  There are few other frammishes.  User-level
10  * locking should be done with the full lock manager --- which depends on
11  * LWLocks to protect its shared state.
12  *
13  *
14  * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
15  * Portions Copyright (c) 1994, Regents of the University of California
16  *
17  * IDENTIFICATION
18  *        $PostgreSQL: pgsql/src/backend/storage/lmgr/lwlock.c,v 1.39 2006/04/21 16:45:12 tgl Exp $
19  *
20  *-------------------------------------------------------------------------
21  */
22 #include "postgres.h"
23
24 #include "access/clog.h"
25 #include "access/multixact.h"
26 #include "access/subtrans.h"
27 #include "miscadmin.h"
28 #include "storage/ipc.h"
29 #include "storage/lwlock.h"
30 #include "storage/proc.h"
31 #include "storage/spin.h"
32
33
34 /* We use the ShmemLock spinlock to protect LWLockAssign */
35 extern slock_t *ShmemLock;
36
37
38 typedef struct LWLock
39 {
40         slock_t         mutex;                  /* Protects LWLock and queue of PGPROCs */
41         bool            releaseOK;              /* T if ok to release waiters */
42         char            exclusive;              /* # of exclusive holders (0 or 1) */
43         int                     shared;                 /* # of shared holders (0..MaxBackends) */
44         PGPROC     *head;                       /* head of list of waiting PGPROCs */
45         PGPROC     *tail;                       /* tail of list of waiting PGPROCs */
46         /* tail is undefined when head is NULL */
47 } LWLock;
48
49 /*
50  * All the LWLock structs are allocated as an array in shared memory.
51  * (LWLockIds are indexes into the array.)      We force the array stride to
52  * be a power of 2, which saves a few cycles in indexing, but more
53  * importantly also ensures that individual LWLocks don't cross cache line
54  * boundaries.  This reduces cache contention problems, especially on AMD
55  * Opterons.  (Of course, we have to also ensure that the array start
56  * address is suitably aligned.)
57  *
58  * LWLock is between 16 and 32 bytes on all known platforms, so these two
59  * cases are sufficient.
60  */
61 #define LWLOCK_PADDED_SIZE      (sizeof(LWLock) <= 16 ? 16 : 32)
62
63 typedef union LWLockPadded
64 {
65         LWLock          lock;
66         char            pad[LWLOCK_PADDED_SIZE];
67 } LWLockPadded;
68
69 /*
70  * This points to the array of LWLocks in shared memory.  Backends inherit
71  * the pointer by fork from the postmaster (except in the EXEC_BACKEND case,
72  * where we have special measures to pass it down).
73  */
74 NON_EXEC_STATIC LWLockPadded *LWLockArray = NULL;
75
76
77 /*
78  * We use this structure to keep track of locked LWLocks for release
79  * during error recovery.  The maximum size could be determined at runtime
80  * if necessary, but it seems unlikely that more than a few locks could
81  * ever be held simultaneously.
82  */
83 #define MAX_SIMUL_LWLOCKS       100
84
85 static int      num_held_lwlocks = 0;
86 static LWLockId held_lwlocks[MAX_SIMUL_LWLOCKS];
87
88 #ifdef LWLOCK_STATS
89 static int      counts_for_pid = 0;
90 static int *sh_acquire_counts;
91 static int *ex_acquire_counts;
92 static int *block_counts;
93 #endif
94
95
96 #ifdef LOCK_DEBUG
97 bool            Trace_lwlocks = false;
98
99 inline static void
100 PRINT_LWDEBUG(const char *where, LWLockId lockid, const volatile LWLock *lock)
101 {
102         if (Trace_lwlocks)
103                 elog(LOG, "%s(%d): excl %d shared %d head %p rOK %d",
104                          where, (int) lockid,
105                          (int) lock->exclusive, lock->shared, lock->head,
106                          (int) lock->releaseOK);
107 }
108
109 inline static void
110 LOG_LWDEBUG(const char *where, LWLockId lockid, const char *msg)
111 {
112         if (Trace_lwlocks)
113                 elog(LOG, "%s(%d): %s", where, (int) lockid, msg);
114 }
115 #else                                                   /* not LOCK_DEBUG */
116 #define PRINT_LWDEBUG(a,b,c)
117 #define LOG_LWDEBUG(a,b,c)
118 #endif   /* LOCK_DEBUG */
119
120 #ifdef LWLOCK_STATS
121
122 static void
123 print_lwlock_stats(int code, Datum arg)
124 {
125         int                     i;
126         int                *LWLockCounter = (int *) ((char *) LWLockArray - 2 * sizeof(int));
127         int                     numLocks = LWLockCounter[1];
128
129         /* Grab an LWLock to keep different backends from mixing reports */
130         LWLockAcquire(0, LW_EXCLUSIVE);
131
132         for (i = 0; i < numLocks; i++)
133         {
134                 if (sh_acquire_counts[i] || ex_acquire_counts[i] || block_counts[i])
135                         fprintf(stderr, "PID %d lwlock %d: shacq %u exacq %u blk %u\n",
136                                         MyProcPid, i, sh_acquire_counts[i], ex_acquire_counts[i],
137                                         block_counts[i]);
138         }
139
140         LWLockRelease(0);
141 }
142
143 #endif /* LWLOCK_STATS */
144
145
146 /*
147  * Compute number of LWLocks to allocate.
148  */
149 int
150 NumLWLocks(void)
151 {
152         int                     numLocks;
153
154         /*
155          * Possibly this logic should be spread out among the affected modules,
156          * the same way that shmem space estimation is done.  But for now, there
157          * are few enough users of LWLocks that we can get away with just keeping
158          * the knowledge here.
159          */
160
161         /* Predefined LWLocks */
162         numLocks = (int) FirstLockMgrLock;
163
164         /* lock.c gets the ones starting at FirstLockMgrLock */
165         numLocks += NUM_LOCK_PARTITIONS;
166
167         /* bufmgr.c needs two for each shared buffer */
168         numLocks += 2 * NBuffers;
169
170         /* clog.c needs one per CLOG buffer */
171         numLocks += NUM_CLOG_BUFFERS;
172
173         /* subtrans.c needs one per SubTrans buffer */
174         numLocks += NUM_SUBTRANS_BUFFERS;
175
176         /* multixact.c needs two SLRU areas */
177         numLocks += NUM_MXACTOFFSET_BUFFERS + NUM_MXACTMEMBER_BUFFERS;
178
179         /* Leave a few extra for use by user-defined modules. */
180         numLocks += NUM_USER_DEFINED_LWLOCKS;
181
182         return numLocks;
183 }
184
185
186 /*
187  * Compute shmem space needed for LWLocks.
188  */
189 Size
190 LWLockShmemSize(void)
191 {
192         Size            size;
193         int                     numLocks = NumLWLocks();
194
195         /* Space for the LWLock array. */
196         size = mul_size(numLocks, sizeof(LWLockPadded));
197
198         /* Space for dynamic allocation counter, plus room for alignment. */
199         size = add_size(size, 2 * sizeof(int) + LWLOCK_PADDED_SIZE);
200
201         return size;
202 }
203
204
205 /*
206  * Allocate shmem space for LWLocks and initialize the locks.
207  */
208 void
209 CreateLWLocks(void)
210 {
211         int                     numLocks = NumLWLocks();
212         Size            spaceLocks = LWLockShmemSize();
213         LWLockPadded *lock;
214         int                *LWLockCounter;
215         char       *ptr;
216         int                     id;
217
218         /* Allocate space */
219         ptr = (char *) ShmemAlloc(spaceLocks);
220
221         /* Leave room for dynamic allocation counter */
222         ptr += 2 * sizeof(int);
223
224         /* Ensure desired alignment of LWLock array */
225         ptr += LWLOCK_PADDED_SIZE - ((unsigned long) ptr) % LWLOCK_PADDED_SIZE;
226
227         LWLockArray = (LWLockPadded *) ptr;
228
229         /*
230          * Initialize all LWLocks to "unlocked" state
231          */
232         for (id = 0, lock = LWLockArray; id < numLocks; id++, lock++)
233         {
234                 SpinLockInit(&lock->lock.mutex);
235                 lock->lock.releaseOK = true;
236                 lock->lock.exclusive = 0;
237                 lock->lock.shared = 0;
238                 lock->lock.head = NULL;
239                 lock->lock.tail = NULL;
240         }
241
242         /*
243          * Initialize the dynamic-allocation counter, which is stored just before
244          * the first LWLock.  The LWLocks used by lock.c are not dynamically
245          * allocated, it just assumes it has them.
246          */
247         LWLockCounter = (int *) ((char *) LWLockArray - 2 * sizeof(int));
248         LWLockCounter[0] = (int) FirstLockMgrLock + NUM_LOCK_PARTITIONS;
249         LWLockCounter[1] = numLocks;
250 }
251
252
253 /*
254  * LWLockAssign - assign a dynamically-allocated LWLock number
255  *
256  * We interlock this using the same spinlock that is used to protect
257  * ShmemAlloc().  Interlocking is not really necessary during postmaster
258  * startup, but it is needed if any user-defined code tries to allocate
259  * LWLocks after startup.
260  */
261 LWLockId
262 LWLockAssign(void)
263 {
264         LWLockId        result;
265
266         /* use volatile pointer to prevent code rearrangement */
267         volatile int *LWLockCounter;
268
269         LWLockCounter = (int *) ((char *) LWLockArray - 2 * sizeof(int));
270         SpinLockAcquire(ShmemLock);
271         if (LWLockCounter[0] >= LWLockCounter[1])
272         {
273                 SpinLockRelease(ShmemLock);
274                 elog(ERROR, "no more LWLockIds available");
275         }
276         result = (LWLockId) (LWLockCounter[0]++);
277         SpinLockRelease(ShmemLock);
278         return result;
279 }
280
281
282 /*
283  * LWLockAcquire - acquire a lightweight lock in the specified mode
284  *
285  * If the lock is not available, sleep until it is.
286  *
287  * Side effect: cancel/die interrupts are held off until lock release.
288  */
289 void
290 LWLockAcquire(LWLockId lockid, LWLockMode mode)
291 {
292         volatile LWLock *lock = &(LWLockArray[lockid].lock);
293         PGPROC     *proc = MyProc;
294         bool            retry = false;
295         int                     extraWaits = 0;
296
297         PRINT_LWDEBUG("LWLockAcquire", lockid, lock);
298
299 #ifdef LWLOCK_STATS
300         /* Set up local count state first time through in a given process */
301         if (counts_for_pid != MyProcPid)
302         {
303                 int        *LWLockCounter = (int *) ((char *) LWLockArray - 2 * sizeof(int));
304                 int             numLocks = LWLockCounter[1];
305
306                 sh_acquire_counts = calloc(numLocks, sizeof(int));
307                 ex_acquire_counts = calloc(numLocks, sizeof(int));
308                 block_counts = calloc(numLocks, sizeof(int));
309                 counts_for_pid = MyProcPid;
310                 on_shmem_exit(print_lwlock_stats, 0);
311         }
312         /* Count lock acquisition attempts */
313         if (mode == LW_EXCLUSIVE)
314                 ex_acquire_counts[lockid]++;
315         else
316                 sh_acquire_counts[lockid]++;
317 #endif /* LWLOCK_STATS */
318
319         /*
320          * We can't wait if we haven't got a PGPROC.  This should only occur
321          * during bootstrap or shared memory initialization.  Put an Assert here
322          * to catch unsafe coding practices.
323          */
324         Assert(!(proc == NULL && IsUnderPostmaster));
325
326         /* Ensure we will have room to remember the lock */
327         if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
328                 elog(ERROR, "too many LWLocks taken");
329
330         /*
331          * Lock out cancel/die interrupts until we exit the code section protected
332          * by the LWLock.  This ensures that interrupts will not interfere with
333          * manipulations of data structures in shared memory.
334          */
335         HOLD_INTERRUPTS();
336
337         /*
338          * Loop here to try to acquire lock after each time we are signaled by
339          * LWLockRelease.
340          *
341          * NOTE: it might seem better to have LWLockRelease actually grant us the
342          * lock, rather than retrying and possibly having to go back to sleep. But
343          * in practice that is no good because it means a process swap for every
344          * lock acquisition when two or more processes are contending for the same
345          * lock.  Since LWLocks are normally used to protect not-very-long
346          * sections of computation, a process needs to be able to acquire and
347          * release the same lock many times during a single CPU time slice, even
348          * in the presence of contention.  The efficiency of being able to do that
349          * outweighs the inefficiency of sometimes wasting a process dispatch
350          * cycle because the lock is not free when a released waiter finally gets
351          * to run.      See pgsql-hackers archives for 29-Dec-01.
352          */
353         for (;;)
354         {
355                 bool            mustwait;
356
357                 /* Acquire mutex.  Time spent holding mutex should be short! */
358                 SpinLockAcquire(&lock->mutex);
359
360                 /* If retrying, allow LWLockRelease to release waiters again */
361                 if (retry)
362                         lock->releaseOK = true;
363
364                 /* If I can get the lock, do so quickly. */
365                 if (mode == LW_EXCLUSIVE)
366                 {
367                         if (lock->exclusive == 0 && lock->shared == 0)
368                         {
369                                 lock->exclusive++;
370                                 mustwait = false;
371                         }
372                         else
373                                 mustwait = true;
374                 }
375                 else
376                 {
377                         if (lock->exclusive == 0)
378                         {
379                                 lock->shared++;
380                                 mustwait = false;
381                         }
382                         else
383                                 mustwait = true;
384                 }
385
386                 if (!mustwait)
387                         break;                          /* got the lock */
388
389                 /*
390                  * Add myself to wait queue.
391                  *
392                  * If we don't have a PGPROC structure, there's no way to wait. This
393                  * should never occur, since MyProc should only be null during shared
394                  * memory initialization.
395                  */
396                 if (proc == NULL)
397                         elog(PANIC, "cannot wait without a PGPROC structure");
398
399                 proc->lwWaiting = true;
400                 proc->lwExclusive = (mode == LW_EXCLUSIVE);
401                 proc->lwWaitLink = NULL;
402                 if (lock->head == NULL)
403                         lock->head = proc;
404                 else
405                         lock->tail->lwWaitLink = proc;
406                 lock->tail = proc;
407
408                 /* Can release the mutex now */
409                 SpinLockRelease(&lock->mutex);
410
411                 /*
412                  * Wait until awakened.
413                  *
414                  * Since we share the process wait semaphore with the regular lock
415                  * manager and ProcWaitForSignal, and we may need to acquire an LWLock
416                  * while one of those is pending, it is possible that we get awakened
417                  * for a reason other than being signaled by LWLockRelease. If so,
418                  * loop back and wait again.  Once we've gotten the LWLock,
419                  * re-increment the sema by the number of additional signals received,
420                  * so that the lock manager or signal manager will see the received
421                  * signal when it next waits.
422                  */
423                 LOG_LWDEBUG("LWLockAcquire", lockid, "waiting");
424
425 #ifdef LWLOCK_STATS
426                 block_counts[lockid]++;
427 #endif
428
429                 for (;;)
430                 {
431                         /* "false" means cannot accept cancel/die interrupt here. */
432                         PGSemaphoreLock(&proc->sem, false);
433                         if (!proc->lwWaiting)
434                                 break;
435                         extraWaits++;
436                 }
437
438                 LOG_LWDEBUG("LWLockAcquire", lockid, "awakened");
439
440                 /* Now loop back and try to acquire lock again. */
441                 retry = true;
442         }
443
444         /* We are done updating shared state of the lock itself. */
445         SpinLockRelease(&lock->mutex);
446
447         /* Add lock to list of locks held by this backend */
448         held_lwlocks[num_held_lwlocks++] = lockid;
449
450         /*
451          * Fix the process wait semaphore's count for any absorbed wakeups.
452          */
453         while (extraWaits-- > 0)
454                 PGSemaphoreUnlock(&proc->sem);
455 }
456
457 /*
458  * LWLockConditionalAcquire - acquire a lightweight lock in the specified mode
459  *
460  * If the lock is not available, return FALSE with no side-effects.
461  *
462  * If successful, cancel/die interrupts are held off until lock release.
463  */
464 bool
465 LWLockConditionalAcquire(LWLockId lockid, LWLockMode mode)
466 {
467         volatile LWLock *lock = &(LWLockArray[lockid].lock);
468         bool            mustwait;
469
470         PRINT_LWDEBUG("LWLockConditionalAcquire", lockid, lock);
471
472         /* Ensure we will have room to remember the lock */
473         if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
474                 elog(ERROR, "too many LWLocks taken");
475
476         /*
477          * Lock out cancel/die interrupts until we exit the code section protected
478          * by the LWLock.  This ensures that interrupts will not interfere with
479          * manipulations of data structures in shared memory.
480          */
481         HOLD_INTERRUPTS();
482
483         /* Acquire mutex.  Time spent holding mutex should be short! */
484         SpinLockAcquire(&lock->mutex);
485
486         /* If I can get the lock, do so quickly. */
487         if (mode == LW_EXCLUSIVE)
488         {
489                 if (lock->exclusive == 0 && lock->shared == 0)
490                 {
491                         lock->exclusive++;
492                         mustwait = false;
493                 }
494                 else
495                         mustwait = true;
496         }
497         else
498         {
499                 if (lock->exclusive == 0)
500                 {
501                         lock->shared++;
502                         mustwait = false;
503                 }
504                 else
505                         mustwait = true;
506         }
507
508         /* We are done updating shared state of the lock itself. */
509         SpinLockRelease(&lock->mutex);
510
511         if (mustwait)
512         {
513                 /* Failed to get lock, so release interrupt holdoff */
514                 RESUME_INTERRUPTS();
515                 LOG_LWDEBUG("LWLockConditionalAcquire", lockid, "failed");
516         }
517         else
518         {
519                 /* Add lock to list of locks held by this backend */
520                 held_lwlocks[num_held_lwlocks++] = lockid;
521         }
522
523         return !mustwait;
524 }
525
526 /*
527  * LWLockRelease - release a previously acquired lock
528  */
529 void
530 LWLockRelease(LWLockId lockid)
531 {
532         volatile LWLock *lock = &(LWLockArray[lockid].lock);
533         PGPROC     *head;
534         PGPROC     *proc;
535         int                     i;
536
537         PRINT_LWDEBUG("LWLockRelease", lockid, lock);
538
539         /*
540          * Remove lock from list of locks held.  Usually, but not always, it will
541          * be the latest-acquired lock; so search array backwards.
542          */
543         for (i = num_held_lwlocks; --i >= 0;)
544         {
545                 if (lockid == held_lwlocks[i])
546                         break;
547         }
548         if (i < 0)
549                 elog(ERROR, "lock %d is not held", (int) lockid);
550         num_held_lwlocks--;
551         for (; i < num_held_lwlocks; i++)
552                 held_lwlocks[i] = held_lwlocks[i + 1];
553
554         /* Acquire mutex.  Time spent holding mutex should be short! */
555         SpinLockAcquire(&lock->mutex);
556
557         /* Release my hold on lock */
558         if (lock->exclusive > 0)
559                 lock->exclusive--;
560         else
561         {
562                 Assert(lock->shared > 0);
563                 lock->shared--;
564         }
565
566         /*
567          * See if I need to awaken any waiters.  If I released a non-last shared
568          * hold, there cannot be anything to do.  Also, do not awaken any waiters
569          * if someone has already awakened waiters that haven't yet acquired the
570          * lock.
571          */
572         head = lock->head;
573         if (head != NULL)
574         {
575                 if (lock->exclusive == 0 && lock->shared == 0 && lock->releaseOK)
576                 {
577                         /*
578                          * Remove the to-be-awakened PGPROCs from the queue.  If the front
579                          * waiter wants exclusive lock, awaken him only. Otherwise awaken
580                          * as many waiters as want shared access.
581                          */
582                         proc = head;
583                         if (!proc->lwExclusive)
584                         {
585                                 while (proc->lwWaitLink != NULL &&
586                                            !proc->lwWaitLink->lwExclusive)
587                                         proc = proc->lwWaitLink;
588                         }
589                         /* proc is now the last PGPROC to be released */
590                         lock->head = proc->lwWaitLink;
591                         proc->lwWaitLink = NULL;
592                         /* prevent additional wakeups until retryer gets to run */
593                         lock->releaseOK = false;
594                 }
595                 else
596                 {
597                         /* lock is still held, can't awaken anything */
598                         head = NULL;
599                 }
600         }
601
602         /* We are done updating shared state of the lock itself. */
603         SpinLockRelease(&lock->mutex);
604
605         /*
606          * Awaken any waiters I removed from the queue.
607          */
608         while (head != NULL)
609         {
610                 LOG_LWDEBUG("LWLockRelease", lockid, "release waiter");
611                 proc = head;
612                 head = proc->lwWaitLink;
613                 proc->lwWaitLink = NULL;
614                 proc->lwWaiting = false;
615                 PGSemaphoreUnlock(&proc->sem);
616         }
617
618         /*
619          * Now okay to allow cancel/die interrupts.
620          */
621         RESUME_INTERRUPTS();
622 }
623
624
625 /*
626  * LWLockReleaseAll - release all currently-held locks
627  *
628  * Used to clean up after ereport(ERROR). An important difference between this
629  * function and retail LWLockRelease calls is that InterruptHoldoffCount is
630  * unchanged by this operation.  This is necessary since InterruptHoldoffCount
631  * has been set to an appropriate level earlier in error recovery. We could
632  * decrement it below zero if we allow it to drop for each released lock!
633  */
634 void
635 LWLockReleaseAll(void)
636 {
637         while (num_held_lwlocks > 0)
638         {
639                 HOLD_INTERRUPTS();              /* match the upcoming RESUME_INTERRUPTS */
640
641                 LWLockRelease(held_lwlocks[num_held_lwlocks - 1]);
642         }
643 }
644
645
646 /*
647  * LWLockHeldByMe - test whether my process currently holds a lock
648  *
649  * This is meant as debug support only.  We do not distinguish whether the
650  * lock is held shared or exclusive.
651  */
652 bool
653 LWLockHeldByMe(LWLockId lockid)
654 {
655         int                     i;
656
657         for (i = 0; i < num_held_lwlocks; i++)
658         {
659                 if (held_lwlocks[i] == lockid)
660                         return true;
661         }
662         return false;
663 }