]> granicus.if.org Git - postgresql/blob - src/backend/storage/lmgr/lwlock.c
This patch extracts page buffer pooling and the simple
[postgresql] / src / backend / storage / lmgr / lwlock.c
1 /*-------------------------------------------------------------------------
2  *
3  * lwlock.c
4  *        Lightweight lock manager
5  *
6  * Lightweight locks are intended primarily to provide mutual exclusion of
7  * access to shared-memory data structures.  Therefore, they offer both
8  * exclusive and shared lock modes (to support read/write and read-only
9  * access to a shared object).  There are few other frammishes.  User-level
10  * locking should be done with the full lock manager --- which depends on
11  * an LWLock to protect its shared state.
12  *
13  *
14  * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
15  * Portions Copyright (c) 1994, Regents of the University of California
16  *
17  * IDENTIFICATION
18  *        $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lwlock.c,v 1.15 2003/06/11 22:37:45 momjian Exp $
19  *
20  *-------------------------------------------------------------------------
21  */
22 #include "postgres.h"
23
24 #include "access/clog.h"
25 #include "storage/lwlock.h"
26 #include "storage/proc.h"
27 #include "storage/spin.h"
28
29
30 typedef struct LWLock
31 {
32         slock_t         mutex;                  /* Protects LWLock and queue of PGPROCs */
33         bool            releaseOK;              /* T if ok to release waiters */
34         char            exclusive;              /* # of exclusive holders (0 or 1) */
35         int                     shared;                 /* # of shared holders (0..MaxBackends) */
36         PGPROC     *head;                       /* head of list of waiting PGPROCs */
37         PGPROC     *tail;                       /* tail of list of waiting PGPROCs */
38         /* tail is undefined when head is NULL */
39 } LWLock;
40
41 /*
42  * This points to the array of LWLocks in shared memory.  Backends inherit
43  * the pointer by fork from the postmaster.  LWLockIds are indexes into
44  * the array.
45  */
46 static LWLock *LWLockArray = NULL;
47
48 /* shared counter for dynamic allocation of LWLockIds */
49 static int *LWLockCounter;
50
51
52 /*
53  * We use this structure to keep track of locked LWLocks for release
54  * during error recovery.  The maximum size could be determined at runtime
55  * if necessary, but it seems unlikely that more than a few locks could
56  * ever be held simultaneously.
57  */
58 #define MAX_SIMUL_LWLOCKS       100
59
60 static int      num_held_lwlocks = 0;
61 static LWLockId held_lwlocks[MAX_SIMUL_LWLOCKS];
62
63
64 #ifdef LOCK_DEBUG
65 bool            Trace_lwlocks = false;
66
67 inline static void
68 PRINT_LWDEBUG(const char *where, LWLockId lockid, const volatile LWLock *lock)
69 {
70         if (Trace_lwlocks)
71                 elog(LOG, "%s(%d): excl %d shared %d head %p rOK %d",
72                          where, (int) lockid,
73                          (int) lock->exclusive, lock->shared, lock->head,
74                          (int) lock->releaseOK);
75 }
76
77 inline static void
78 LOG_LWDEBUG(const char *where, LWLockId lockid, const char *msg)
79 {
80         if (Trace_lwlocks)
81                 elog(LOG, "%s(%d): %s", where, (int) lockid, msg);
82 }
83
84 #else                                                   /* not LOCK_DEBUG */
85 #define PRINT_LWDEBUG(a,b,c)
86 #define LOG_LWDEBUG(a,b,c)
87 #endif   /* LOCK_DEBUG */
88
89
90 /*
91  * Compute number of LWLocks to allocate.
92  */
93 int
94 NumLWLocks(void)
95 {
96         int                     numLocks;
97
98         /*
99          * Possibly this logic should be spread out among the affected
100          * modules, the same way that shmem space estimation is done.  But for
101          * now, there are few enough users of LWLocks that we can get away
102          * with just keeping the knowledge here.
103          */
104
105         /* Predefined LWLocks */
106         numLocks = (int) NumFixedLWLocks;
107
108         /* bufmgr.c needs two for each shared buffer */
109         numLocks += 2 * NBuffers;
110
111         /* clog.c needs one per CLOG buffer + one control lock */
112         numLocks += NUM_CLOG_BUFFERS + 1;
113
114         /* Perhaps create a few more for use by user-defined modules? */
115
116         return numLocks;
117 }
118
119
120 /*
121  * Compute shmem space needed for LWLocks.
122  */
123 int
124 LWLockShmemSize(void)
125 {
126         int                     numLocks = NumLWLocks();
127         uint32          spaceLocks;
128
129         /* Allocate the LWLocks plus space for shared allocation counter. */
130         spaceLocks = numLocks * sizeof(LWLock) + 2 * sizeof(int);
131         spaceLocks = MAXALIGN(spaceLocks);
132
133         return (int) spaceLocks;
134 }
135
136
137 /*
138  * Allocate shmem space for LWLocks and initialize the locks.
139  */
140 void
141 CreateLWLocks(void)
142 {
143         int                     numLocks = NumLWLocks();
144         uint32          spaceLocks = LWLockShmemSize();
145         LWLock     *lock;
146         int                     id;
147
148         /* Allocate space */
149         LWLockArray = (LWLock *) ShmemAlloc(spaceLocks);
150
151         /*
152          * Initialize all LWLocks to "unlocked" state
153          */
154         for (id = 0, lock = LWLockArray; id < numLocks; id++, lock++)
155         {
156                 SpinLockInit(&lock->mutex);
157                 lock->releaseOK = true;
158                 lock->exclusive = 0;
159                 lock->shared = 0;
160                 lock->head = NULL;
161                 lock->tail = NULL;
162         }
163
164         /*
165          * Initialize the dynamic-allocation counter at the end of the array
166          */
167         LWLockCounter = (int *) lock;
168         LWLockCounter[0] = (int) NumFixedLWLocks;
169         LWLockCounter[1] = numLocks;
170 }
171
172
173 /*
174  * LWLockAssign - assign a dynamically-allocated LWLock number
175  *
176  * NB: we do not currently try to interlock this.  Could perhaps use
177  * ShmemLock spinlock if there were any need to assign LWLockIds after
178  * shmem setup.
179  */
180 LWLockId
181 LWLockAssign(void)
182 {
183         if (LWLockCounter[0] >= LWLockCounter[1])
184                 elog(FATAL, "No more LWLockIds available");
185         return (LWLockId) (LWLockCounter[0]++);
186 }
187
188
189 /*
190  * LWLockAcquire - acquire a lightweight lock in the specified mode
191  *
192  * If the lock is not available, sleep until it is.
193  *
194  * Side effect: cancel/die interrupts are held off until lock release.
195  */
196 void
197 LWLockAcquire(LWLockId lockid, LWLockMode mode)
198 {
199         volatile LWLock *lock = LWLockArray + lockid;
200         PGPROC     *proc = MyProc;
201         bool            retry = false;
202         int                     extraWaits = 0;
203
204         PRINT_LWDEBUG("LWLockAcquire", lockid, lock);
205
206         /*
207          * We can't wait if we haven't got a PGPROC.  This should only occur
208          * during bootstrap or shared memory initialization.  Put an Assert
209          * here to catch unsafe coding practices.
210          */
211         Assert(!(proc == NULL && IsUnderPostmaster));
212
213         /*
214          * Lock out cancel/die interrupts until we exit the code section
215          * protected by the LWLock.  This ensures that interrupts will not
216          * interfere with manipulations of data structures in shared memory.
217          */
218         HOLD_INTERRUPTS();
219
220         /*
221          * Loop here to try to acquire lock after each time we are signaled by
222          * LWLockRelease.
223          *
224          * NOTE: it might seem better to have LWLockRelease actually grant us the
225          * lock, rather than retrying and possibly having to go back to sleep.
226          * But in practice that is no good because it means a process swap for
227          * every lock acquisition when two or more processes are contending
228          * for the same lock.  Since LWLocks are normally used to protect
229          * not-very-long sections of computation, a process needs to be able
230          * to acquire and release the same lock many times during a single CPU
231          * time slice, even in the presence of contention.      The efficiency of
232          * being able to do that outweighs the inefficiency of sometimes
233          * wasting a process dispatch cycle because the lock is not free when
234          * a released waiter finally gets to run.  See pgsql-hackers archives
235          * for 29-Dec-01.
236          */
237         for (;;)
238         {
239                 bool            mustwait;
240
241                 /* Acquire mutex.  Time spent holding mutex should be short! */
242                 SpinLockAcquire_NoHoldoff(&lock->mutex);
243
244                 /* If retrying, allow LWLockRelease to release waiters again */
245                 if (retry)
246                         lock->releaseOK = true;
247
248                 /* If I can get the lock, do so quickly. */
249                 if (mode == LW_EXCLUSIVE)
250                 {
251                         if (lock->exclusive == 0 && lock->shared == 0)
252                         {
253                                 lock->exclusive++;
254                                 mustwait = false;
255                         }
256                         else
257                                 mustwait = true;
258                 }
259                 else
260                 {
261                         if (lock->exclusive == 0)
262                         {
263                                 lock->shared++;
264                                 mustwait = false;
265                         }
266                         else
267                                 mustwait = true;
268                 }
269
270                 if (!mustwait)
271                         break;                          /* got the lock */
272
273                 /*
274                  * Add myself to wait queue.
275                  *
276                  * If we don't have a PGPROC structure, there's no way to wait. This
277                  * should never occur, since MyProc should only be null during
278                  * shared memory initialization.
279                  */
280                 if (proc == NULL)
281                         elog(FATAL, "LWLockAcquire: can't wait without a PGPROC structure");
282
283                 proc->lwWaiting = true;
284                 proc->lwExclusive = (mode == LW_EXCLUSIVE);
285                 proc->lwWaitLink = NULL;
286                 if (lock->head == NULL)
287                         lock->head = proc;
288                 else
289                         lock->tail->lwWaitLink = proc;
290                 lock->tail = proc;
291
292                 /* Can release the mutex now */
293                 SpinLockRelease_NoHoldoff(&lock->mutex);
294
295                 /*
296                  * Wait until awakened.
297                  *
298                  * Since we share the process wait semaphore with the regular lock
299                  * manager and ProcWaitForSignal, and we may need to acquire an
300                  * LWLock while one of those is pending, it is possible that we
301                  * get awakened for a reason other than being signaled by
302                  * LWLockRelease. If so, loop back and wait again.      Once we've
303                  * gotten the LWLock, re-increment the sema by the number of
304                  * additional signals received, so that the lock manager or signal
305                  * manager will see the received signal when it next waits.
306                  */
307                 LOG_LWDEBUG("LWLockAcquire", lockid, "waiting");
308
309                 for (;;)
310                 {
311                         /* "false" means cannot accept cancel/die interrupt here. */
312                         PGSemaphoreLock(&proc->sem, false);
313                         if (!proc->lwWaiting)
314                                 break;
315                         extraWaits++;
316                 }
317
318                 LOG_LWDEBUG("LWLockAcquire", lockid, "awakened");
319
320                 /* Now loop back and try to acquire lock again. */
321                 retry = true;
322         }
323
324         /* We are done updating shared state of the lock itself. */
325         SpinLockRelease_NoHoldoff(&lock->mutex);
326
327         /* Add lock to list of locks held by this backend */
328         Assert(num_held_lwlocks < MAX_SIMUL_LWLOCKS);
329         held_lwlocks[num_held_lwlocks++] = lockid;
330
331         /*
332          * Fix the process wait semaphore's count for any absorbed wakeups.
333          */
334         while (extraWaits-- > 0)
335                 PGSemaphoreUnlock(&proc->sem);
336 }
337
338 /*
339  * LWLockConditionalAcquire - acquire a lightweight lock in the specified mode
340  *
341  * If the lock is not available, return FALSE with no side-effects.
342  *
343  * If successful, cancel/die interrupts are held off until lock release.
344  */
345 bool
346 LWLockConditionalAcquire(LWLockId lockid, LWLockMode mode)
347 {
348         volatile LWLock *lock = LWLockArray + lockid;
349         bool            mustwait;
350
351         PRINT_LWDEBUG("LWLockConditionalAcquire", lockid, lock);
352
353         /*
354          * Lock out cancel/die interrupts until we exit the code section
355          * protected by the LWLock.  This ensures that interrupts will not
356          * interfere with manipulations of data structures in shared memory.
357          */
358         HOLD_INTERRUPTS();
359
360         /* Acquire mutex.  Time spent holding mutex should be short! */
361         SpinLockAcquire_NoHoldoff(&lock->mutex);
362
363         /* If I can get the lock, do so quickly. */
364         if (mode == LW_EXCLUSIVE)
365         {
366                 if (lock->exclusive == 0 && lock->shared == 0)
367                 {
368                         lock->exclusive++;
369                         mustwait = false;
370                 }
371                 else
372                         mustwait = true;
373         }
374         else
375         {
376                 if (lock->exclusive == 0)
377                 {
378                         lock->shared++;
379                         mustwait = false;
380                 }
381                 else
382                         mustwait = true;
383         }
384
385         /* We are done updating shared state of the lock itself. */
386         SpinLockRelease_NoHoldoff(&lock->mutex);
387
388         if (mustwait)
389         {
390                 /* Failed to get lock, so release interrupt holdoff */
391                 RESUME_INTERRUPTS();
392                 LOG_LWDEBUG("LWLockConditionalAcquire", lockid, "failed");
393         }
394         else
395         {
396                 /* Add lock to list of locks held by this backend */
397                 Assert(num_held_lwlocks < MAX_SIMUL_LWLOCKS);
398                 held_lwlocks[num_held_lwlocks++] = lockid;
399         }
400
401         return !mustwait;
402 }
403
404 /*
405  * LWLockRelease - release a previously acquired lock
406  */
407 void
408 LWLockRelease(LWLockId lockid)
409 {
410         volatile LWLock *lock = LWLockArray + lockid;
411         PGPROC     *head;
412         PGPROC     *proc;
413         int                     i;
414
415         PRINT_LWDEBUG("LWLockRelease", lockid, lock);
416
417         /*
418          * Remove lock from list of locks held.  Usually, but not always, it
419          * will be the latest-acquired lock; so search array backwards.
420          */
421         for (i = num_held_lwlocks; --i >= 0;)
422         {
423                 if (lockid == held_lwlocks[i])
424                         break;
425         }
426         if (i < 0)
427                 elog(ERROR, "LWLockRelease: lock %d is not held", (int) lockid);
428         num_held_lwlocks--;
429         for (; i < num_held_lwlocks; i++)
430                 held_lwlocks[i] = held_lwlocks[i + 1];
431
432         /* Acquire mutex.  Time spent holding mutex should be short! */
433         SpinLockAcquire_NoHoldoff(&lock->mutex);
434
435         /* Release my hold on lock */
436         if (lock->exclusive > 0)
437                 lock->exclusive--;
438         else
439         {
440                 Assert(lock->shared > 0);
441                 lock->shared--;
442         }
443
444         /*
445          * See if I need to awaken any waiters.  If I released a non-last
446          * shared hold, there cannot be anything to do.  Also, do not awaken
447          * any waiters if someone has already awakened waiters that haven't
448          * yet acquired the lock.
449          */
450         head = lock->head;
451         if (head != NULL)
452         {
453                 if (lock->exclusive == 0 && lock->shared == 0 && lock->releaseOK)
454                 {
455                         /*
456                          * Remove the to-be-awakened PGPROCs from the queue.  If the
457                          * front waiter wants exclusive lock, awaken him only.
458                          * Otherwise awaken as many waiters as want shared access.
459                          */
460                         proc = head;
461                         if (!proc->lwExclusive)
462                         {
463                                 while (proc->lwWaitLink != NULL &&
464                                            !proc->lwWaitLink->lwExclusive)
465                                         proc = proc->lwWaitLink;
466                         }
467                         /* proc is now the last PGPROC to be released */
468                         lock->head = proc->lwWaitLink;
469                         proc->lwWaitLink = NULL;
470                         /* prevent additional wakeups until retryer gets to run */
471                         lock->releaseOK = false;
472                 }
473                 else
474                 {
475                         /* lock is still held, can't awaken anything */
476                         head = NULL;
477                 }
478         }
479
480         /* We are done updating shared state of the lock itself. */
481         SpinLockRelease_NoHoldoff(&lock->mutex);
482
483         /*
484          * Awaken any waiters I removed from the queue.
485          */
486         while (head != NULL)
487         {
488                 LOG_LWDEBUG("LWLockRelease", lockid, "release waiter");
489                 proc = head;
490                 head = proc->lwWaitLink;
491                 proc->lwWaitLink = NULL;
492                 proc->lwWaiting = false;
493                 PGSemaphoreUnlock(&proc->sem);
494         }
495
496         /*
497          * Now okay to allow cancel/die interrupts.
498          */
499         RESUME_INTERRUPTS();
500 }
501
502
503 /*
504  * LWLockReleaseAll - release all currently-held locks
505  *
506  * Used to clean up after elog(ERROR).  An important difference between this
507  * function and retail LWLockRelease calls is that InterruptHoldoffCount is
508  * unchanged by this operation.  This is necessary since InterruptHoldoffCount
509  * has been set to an appropriate level earlier in error recovery.      We could
510  * decrement it below zero if we allow it to drop for each released lock!
511  */
512 void
513 LWLockReleaseAll(void)
514 {
515         while (num_held_lwlocks > 0)
516         {
517                 HOLD_INTERRUPTS();              /* match the upcoming RESUME_INTERRUPTS */
518
519                 LWLockRelease(held_lwlocks[num_held_lwlocks - 1]);
520         }
521 }