]> granicus.if.org Git - postgresql/blob - src/backend/storage/lmgr/lwlock.c
Allocate a few extra LWLocks for possible use by add-on modules.
[postgresql] / src / backend / storage / lmgr / lwlock.c
1 /*-------------------------------------------------------------------------
2  *
3  * lwlock.c
4  *        Lightweight lock manager
5  *
6  * Lightweight locks are intended primarily to provide mutual exclusion of
7  * access to shared-memory data structures.  Therefore, they offer both
8  * exclusive and shared lock modes (to support read/write and read-only
9  * access to a shared object).  There are few other frammishes.  User-level
10  * locking should be done with the full lock manager --- which depends on
11  * an LWLock to protect its shared state.
12  *
13  *
14  * Portions Copyright (c) 1996-2005, PostgreSQL Global Development Group
15  * Portions Copyright (c) 1994, Regents of the University of California
16  *
17  * IDENTIFICATION
18  *        $PostgreSQL: pgsql/src/backend/storage/lmgr/lwlock.c,v 1.31 2005/10/07 20:11:03 tgl Exp $
19  *
20  *-------------------------------------------------------------------------
21  */
22 #include "postgres.h"
23
24 #include "access/slru.h"
25 #include "storage/lwlock.h"
26 #include "storage/proc.h"
27 #include "storage/spin.h"
28
29
30 typedef struct LWLock
31 {
32         slock_t         mutex;                  /* Protects LWLock and queue of PGPROCs */
33         bool            releaseOK;              /* T if ok to release waiters */
34         char            exclusive;              /* # of exclusive holders (0 or 1) */
35         int                     shared;                 /* # of shared holders (0..MaxBackends) */
36         PGPROC     *head;                       /* head of list of waiting PGPROCs */
37         PGPROC     *tail;                       /* tail of list of waiting PGPROCs */
38         /* tail is undefined when head is NULL */
39 } LWLock;
40
41 /*
42  * All the LWLock structs are allocated as an array in shared memory.
43  * (LWLockIds are indexes into the array.)  We force the array stride to
44  * be a power of 2, which saves a few cycles in indexing, but more
45  * importantly also ensures that individual LWLocks don't cross cache line
46  * boundaries.  This reduces cache contention problems, especially on AMD
47  * Opterons.  (Of course, we have to also ensure that the array start
48  * address is suitably aligned.)
49  *
50  * LWLock is between 16 and 32 bytes on all known platforms, so these two
51  * cases are sufficient.
52  */
53 #define LWLOCK_PADDED_SIZE      (sizeof(LWLock) <= 16 ? 16 : 32)
54
55 typedef union LWLockPadded
56 {
57         LWLock          lock;
58         char            pad[LWLOCK_PADDED_SIZE];
59 } LWLockPadded;
60
61 /*
62  * This points to the array of LWLocks in shared memory.  Backends inherit
63  * the pointer by fork from the postmaster (except in the EXEC_BACKEND case,
64  * where we have special measures to pass it down).
65  */
66 NON_EXEC_STATIC LWLockPadded *LWLockArray = NULL;
67
68 /* shared counter for dynamic allocation of LWLockIds */
69 static int *LWLockCounter;
70
71
72 /*
73  * We use this structure to keep track of locked LWLocks for release
74  * during error recovery.  The maximum size could be determined at runtime
75  * if necessary, but it seems unlikely that more than a few locks could
76  * ever be held simultaneously.
77  */
78 #define MAX_SIMUL_LWLOCKS       100
79
80 static int      num_held_lwlocks = 0;
81 static LWLockId held_lwlocks[MAX_SIMUL_LWLOCKS];
82
83
84 #ifdef LOCK_DEBUG
85 bool            Trace_lwlocks = false;
86
87 inline static void
88 PRINT_LWDEBUG(const char *where, LWLockId lockid, const volatile LWLock *lock)
89 {
90         if (Trace_lwlocks)
91                 elog(LOG, "%s(%d): excl %d shared %d head %p rOK %d",
92                          where, (int) lockid,
93                          (int) lock->exclusive, lock->shared, lock->head,
94                          (int) lock->releaseOK);
95 }
96
97 inline static void
98 LOG_LWDEBUG(const char *where, LWLockId lockid, const char *msg)
99 {
100         if (Trace_lwlocks)
101                 elog(LOG, "%s(%d): %s", where, (int) lockid, msg);
102 }
103
104 #else                                                   /* not LOCK_DEBUG */
105 #define PRINT_LWDEBUG(a,b,c)
106 #define LOG_LWDEBUG(a,b,c)
107 #endif   /* LOCK_DEBUG */
108
109
110 /*
111  * Compute number of LWLocks to allocate.
112  */
113 int
114 NumLWLocks(void)
115 {
116         int                     numLocks;
117
118         /*
119          * Possibly this logic should be spread out among the affected
120          * modules, the same way that shmem space estimation is done.  But for
121          * now, there are few enough users of LWLocks that we can get away
122          * with just keeping the knowledge here.
123          */
124
125         /* Predefined LWLocks */
126         numLocks = (int) NumFixedLWLocks;
127
128         /* bufmgr.c needs two for each shared buffer */
129         numLocks += 2 * NBuffers;
130
131         /* clog.c needs one per CLOG buffer */
132         numLocks += NUM_SLRU_BUFFERS;
133
134         /* subtrans.c needs one per SubTrans buffer */
135         numLocks += NUM_SLRU_BUFFERS;
136
137         /*
138          * multixact.c needs one per MultiXact buffer, but there are
139          * two SLRU areas for MultiXact
140          */
141         numLocks += 2 * NUM_SLRU_BUFFERS;
142
143         /* Leave a few extra for use by user-defined modules. */
144         numLocks += NUM_USER_DEFINED_LWLOCKS;
145
146         return numLocks;
147 }
148
149
150 /*
151  * Compute shmem space needed for LWLocks.
152  */
153 Size
154 LWLockShmemSize(void)
155 {
156         Size            size;
157         int                     numLocks = NumLWLocks();
158
159         /* Space for the LWLock array. */
160         size = mul_size(numLocks, sizeof(LWLockPadded));
161
162         /* Space for shared allocation counter, plus room for alignment. */
163         size = add_size(size, 2 * sizeof(int) + LWLOCK_PADDED_SIZE);
164
165         return size;
166 }
167
168
169 /*
170  * Allocate shmem space for LWLocks and initialize the locks.
171  */
172 void
173 CreateLWLocks(void)
174 {
175         int                     numLocks = NumLWLocks();
176         Size            spaceLocks = LWLockShmemSize();
177         LWLockPadded *lock;
178         char       *ptr;
179         int                     id;
180
181         /* Allocate space */
182         ptr = (char *) ShmemAlloc(spaceLocks);
183
184         /* Ensure desired alignment of LWLock array */
185         ptr += LWLOCK_PADDED_SIZE - ((unsigned long) ptr) % LWLOCK_PADDED_SIZE;
186
187         LWLockArray = (LWLockPadded *) ptr;
188
189         /*
190          * Initialize all LWLocks to "unlocked" state
191          */
192         for (id = 0, lock = LWLockArray; id < numLocks; id++, lock++)
193         {
194                 SpinLockInit(&lock->lock.mutex);
195                 lock->lock.releaseOK = true;
196                 lock->lock.exclusive = 0;
197                 lock->lock.shared = 0;
198                 lock->lock.head = NULL;
199                 lock->lock.tail = NULL;
200         }
201
202         /*
203          * Initialize the dynamic-allocation counter at the end of the array
204          */
205         LWLockCounter = (int *) lock;
206         LWLockCounter[0] = (int) NumFixedLWLocks;
207         LWLockCounter[1] = numLocks;
208 }
209
210
211 /*
212  * LWLockAssign - assign a dynamically-allocated LWLock number
213  *
214  * NB: we do not currently try to interlock this.  Could perhaps use
215  * ShmemLock spinlock if there were any need to assign LWLockIds after
216  * shmem setup.
217  */
218 LWLockId
219 LWLockAssign(void)
220 {
221         if (LWLockCounter[0] >= LWLockCounter[1])
222                 elog(FATAL, "no more LWLockIds available");
223         return (LWLockId) (LWLockCounter[0]++);
224 }
225
226
227 /*
228  * LWLockAcquire - acquire a lightweight lock in the specified mode
229  *
230  * If the lock is not available, sleep until it is.
231  *
232  * Side effect: cancel/die interrupts are held off until lock release.
233  */
234 void
235 LWLockAcquire(LWLockId lockid, LWLockMode mode)
236 {
237         volatile LWLock *lock = &(LWLockArray[lockid].lock);
238         PGPROC     *proc = MyProc;
239         bool            retry = false;
240         int                     extraWaits = 0;
241
242         PRINT_LWDEBUG("LWLockAcquire", lockid, lock);
243
244         /*
245          * We can't wait if we haven't got a PGPROC.  This should only occur
246          * during bootstrap or shared memory initialization.  Put an Assert
247          * here to catch unsafe coding practices.
248          */
249         Assert(!(proc == NULL && IsUnderPostmaster));
250
251         /* Ensure we will have room to remember the lock */
252         if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
253                 elog(ERROR, "too many LWLocks taken");
254
255         /*
256          * Lock out cancel/die interrupts until we exit the code section
257          * protected by the LWLock.  This ensures that interrupts will not
258          * interfere with manipulations of data structures in shared memory.
259          */
260         HOLD_INTERRUPTS();
261
262         /*
263          * Loop here to try to acquire lock after each time we are signaled by
264          * LWLockRelease.
265          *
266          * NOTE: it might seem better to have LWLockRelease actually grant us the
267          * lock, rather than retrying and possibly having to go back to sleep.
268          * But in practice that is no good because it means a process swap for
269          * every lock acquisition when two or more processes are contending
270          * for the same lock.  Since LWLocks are normally used to protect
271          * not-very-long sections of computation, a process needs to be able
272          * to acquire and release the same lock many times during a single CPU
273          * time slice, even in the presence of contention.      The efficiency of
274          * being able to do that outweighs the inefficiency of sometimes
275          * wasting a process dispatch cycle because the lock is not free when
276          * a released waiter finally gets to run.  See pgsql-hackers archives
277          * for 29-Dec-01.
278          */
279         for (;;)
280         {
281                 bool            mustwait;
282
283                 /* Acquire mutex.  Time spent holding mutex should be short! */
284                 SpinLockAcquire_NoHoldoff(&lock->mutex);
285
286                 /* If retrying, allow LWLockRelease to release waiters again */
287                 if (retry)
288                         lock->releaseOK = true;
289
290                 /* If I can get the lock, do so quickly. */
291                 if (mode == LW_EXCLUSIVE)
292                 {
293                         if (lock->exclusive == 0 && lock->shared == 0)
294                         {
295                                 lock->exclusive++;
296                                 mustwait = false;
297                         }
298                         else
299                                 mustwait = true;
300                 }
301                 else
302                 {
303                         if (lock->exclusive == 0)
304                         {
305                                 lock->shared++;
306                                 mustwait = false;
307                         }
308                         else
309                                 mustwait = true;
310                 }
311
312                 if (!mustwait)
313                         break;                          /* got the lock */
314
315                 /*
316                  * Add myself to wait queue.
317                  *
318                  * If we don't have a PGPROC structure, there's no way to wait. This
319                  * should never occur, since MyProc should only be null during
320                  * shared memory initialization.
321                  */
322                 if (proc == NULL)
323                         elog(FATAL, "cannot wait without a PGPROC structure");
324
325                 proc->lwWaiting = true;
326                 proc->lwExclusive = (mode == LW_EXCLUSIVE);
327                 proc->lwWaitLink = NULL;
328                 if (lock->head == NULL)
329                         lock->head = proc;
330                 else
331                         lock->tail->lwWaitLink = proc;
332                 lock->tail = proc;
333
334                 /* Can release the mutex now */
335                 SpinLockRelease_NoHoldoff(&lock->mutex);
336
337                 /*
338                  * Wait until awakened.
339                  *
340                  * Since we share the process wait semaphore with the regular lock
341                  * manager and ProcWaitForSignal, and we may need to acquire an
342                  * LWLock while one of those is pending, it is possible that we
343                  * get awakened for a reason other than being signaled by
344                  * LWLockRelease. If so, loop back and wait again.      Once we've
345                  * gotten the LWLock, re-increment the sema by the number of
346                  * additional signals received, so that the lock manager or signal
347                  * manager will see the received signal when it next waits.
348                  */
349                 LOG_LWDEBUG("LWLockAcquire", lockid, "waiting");
350
351                 for (;;)
352                 {
353                         /* "false" means cannot accept cancel/die interrupt here. */
354                         PGSemaphoreLock(&proc->sem, false);
355                         if (!proc->lwWaiting)
356                                 break;
357                         extraWaits++;
358                 }
359
360                 LOG_LWDEBUG("LWLockAcquire", lockid, "awakened");
361
362                 /* Now loop back and try to acquire lock again. */
363                 retry = true;
364         }
365
366         /* We are done updating shared state of the lock itself. */
367         SpinLockRelease_NoHoldoff(&lock->mutex);
368
369         /* Add lock to list of locks held by this backend */
370         held_lwlocks[num_held_lwlocks++] = lockid;
371
372         /*
373          * Fix the process wait semaphore's count for any absorbed wakeups.
374          */
375         while (extraWaits-- > 0)
376                 PGSemaphoreUnlock(&proc->sem);
377 }
378
379 /*
380  * LWLockConditionalAcquire - acquire a lightweight lock in the specified mode
381  *
382  * If the lock is not available, return FALSE with no side-effects.
383  *
384  * If successful, cancel/die interrupts are held off until lock release.
385  */
386 bool
387 LWLockConditionalAcquire(LWLockId lockid, LWLockMode mode)
388 {
389         volatile LWLock *lock = &(LWLockArray[lockid].lock);
390         bool            mustwait;
391
392         PRINT_LWDEBUG("LWLockConditionalAcquire", lockid, lock);
393
394         /* Ensure we will have room to remember the lock */
395         if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS)
396                 elog(ERROR, "too many LWLocks taken");
397
398         /*
399          * Lock out cancel/die interrupts until we exit the code section
400          * protected by the LWLock.  This ensures that interrupts will not
401          * interfere with manipulations of data structures in shared memory.
402          */
403         HOLD_INTERRUPTS();
404
405         /* Acquire mutex.  Time spent holding mutex should be short! */
406         SpinLockAcquire_NoHoldoff(&lock->mutex);
407
408         /* If I can get the lock, do so quickly. */
409         if (mode == LW_EXCLUSIVE)
410         {
411                 if (lock->exclusive == 0 && lock->shared == 0)
412                 {
413                         lock->exclusive++;
414                         mustwait = false;
415                 }
416                 else
417                         mustwait = true;
418         }
419         else
420         {
421                 if (lock->exclusive == 0)
422                 {
423                         lock->shared++;
424                         mustwait = false;
425                 }
426                 else
427                         mustwait = true;
428         }
429
430         /* We are done updating shared state of the lock itself. */
431         SpinLockRelease_NoHoldoff(&lock->mutex);
432
433         if (mustwait)
434         {
435                 /* Failed to get lock, so release interrupt holdoff */
436                 RESUME_INTERRUPTS();
437                 LOG_LWDEBUG("LWLockConditionalAcquire", lockid, "failed");
438         }
439         else
440         {
441                 /* Add lock to list of locks held by this backend */
442                 held_lwlocks[num_held_lwlocks++] = lockid;
443         }
444
445         return !mustwait;
446 }
447
448 /*
449  * LWLockRelease - release a previously acquired lock
450  */
451 void
452 LWLockRelease(LWLockId lockid)
453 {
454         volatile LWLock *lock = &(LWLockArray[lockid].lock);
455         PGPROC     *head;
456         PGPROC     *proc;
457         int                     i;
458
459         PRINT_LWDEBUG("LWLockRelease", lockid, lock);
460
461         /*
462          * Remove lock from list of locks held.  Usually, but not always, it
463          * will be the latest-acquired lock; so search array backwards.
464          */
465         for (i = num_held_lwlocks; --i >= 0;)
466         {
467                 if (lockid == held_lwlocks[i])
468                         break;
469         }
470         if (i < 0)
471                 elog(ERROR, "lock %d is not held", (int) lockid);
472         num_held_lwlocks--;
473         for (; i < num_held_lwlocks; i++)
474                 held_lwlocks[i] = held_lwlocks[i + 1];
475
476         /* Acquire mutex.  Time spent holding mutex should be short! */
477         SpinLockAcquire_NoHoldoff(&lock->mutex);
478
479         /* Release my hold on lock */
480         if (lock->exclusive > 0)
481                 lock->exclusive--;
482         else
483         {
484                 Assert(lock->shared > 0);
485                 lock->shared--;
486         }
487
488         /*
489          * See if I need to awaken any waiters.  If I released a non-last
490          * shared hold, there cannot be anything to do.  Also, do not awaken
491          * any waiters if someone has already awakened waiters that haven't
492          * yet acquired the lock.
493          */
494         head = lock->head;
495         if (head != NULL)
496         {
497                 if (lock->exclusive == 0 && lock->shared == 0 && lock->releaseOK)
498                 {
499                         /*
500                          * Remove the to-be-awakened PGPROCs from the queue.  If the
501                          * front waiter wants exclusive lock, awaken him only.
502                          * Otherwise awaken as many waiters as want shared access.
503                          */
504                         proc = head;
505                         if (!proc->lwExclusive)
506                         {
507                                 while (proc->lwWaitLink != NULL &&
508                                            !proc->lwWaitLink->lwExclusive)
509                                         proc = proc->lwWaitLink;
510                         }
511                         /* proc is now the last PGPROC to be released */
512                         lock->head = proc->lwWaitLink;
513                         proc->lwWaitLink = NULL;
514                         /* prevent additional wakeups until retryer gets to run */
515                         lock->releaseOK = false;
516                 }
517                 else
518                 {
519                         /* lock is still held, can't awaken anything */
520                         head = NULL;
521                 }
522         }
523
524         /* We are done updating shared state of the lock itself. */
525         SpinLockRelease_NoHoldoff(&lock->mutex);
526
527         /*
528          * Awaken any waiters I removed from the queue.
529          */
530         while (head != NULL)
531         {
532                 LOG_LWDEBUG("LWLockRelease", lockid, "release waiter");
533                 proc = head;
534                 head = proc->lwWaitLink;
535                 proc->lwWaitLink = NULL;
536                 proc->lwWaiting = false;
537                 PGSemaphoreUnlock(&proc->sem);
538         }
539
540         /*
541          * Now okay to allow cancel/die interrupts.
542          */
543         RESUME_INTERRUPTS();
544 }
545
546
547 /*
548  * LWLockReleaseAll - release all currently-held locks
549  *
550  * Used to clean up after ereport(ERROR). An important difference between this
551  * function and retail LWLockRelease calls is that InterruptHoldoffCount is
552  * unchanged by this operation.  This is necessary since InterruptHoldoffCount
553  * has been set to an appropriate level earlier in error recovery. We could
554  * decrement it below zero if we allow it to drop for each released lock!
555  */
556 void
557 LWLockReleaseAll(void)
558 {
559         while (num_held_lwlocks > 0)
560         {
561                 HOLD_INTERRUPTS();              /* match the upcoming RESUME_INTERRUPTS */
562
563                 LWLockRelease(held_lwlocks[num_held_lwlocks - 1]);
564         }
565 }
566
567
568 /*
569  * LWLockHeldByMe - test whether my process currently holds a lock
570  *
571  * This is meant as debug support only.  We do not distinguish whether the
572  * lock is held shared or exclusive.
573  */
574 bool
575 LWLockHeldByMe(LWLockId lockid)
576 {
577         int                     i;
578
579         for (i = 0; i < num_held_lwlocks; i++)
580         {
581                 if (held_lwlocks[i] == lockid)
582                         return true;
583         }
584         return false;
585 }