]> granicus.if.org Git - postgresql/blob - src/backend/storage/lmgr/lwlock.c
Nested transactions. There is still much left to do, especially on the
[postgresql] / src / backend / storage / lmgr / lwlock.c
1 /*-------------------------------------------------------------------------
2  *
3  * lwlock.c
4  *        Lightweight lock manager
5  *
6  * Lightweight locks are intended primarily to provide mutual exclusion of
7  * access to shared-memory data structures.  Therefore, they offer both
8  * exclusive and shared lock modes (to support read/write and read-only
9  * access to a shared object).  There are few other frammishes.  User-level
10  * locking should be done with the full lock manager --- which depends on
11  * an LWLock to protect its shared state.
12  *
13  *
14  * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
15  * Portions Copyright (c) 1994, Regents of the University of California
16  *
17  * IDENTIFICATION
18  *        $PostgreSQL: pgsql/src/backend/storage/lmgr/lwlock.c,v 1.21 2004/07/01 00:50:59 tgl Exp $
19  *
20  *-------------------------------------------------------------------------
21  */
22 #include "postgres.h"
23
24 #include "access/clog.h"
25 #include "access/subtrans.h"
26 #include "storage/lwlock.h"
27 #include "storage/proc.h"
28 #include "storage/spin.h"
29
30
31 typedef struct LWLock
32 {
33         slock_t         mutex;                  /* Protects LWLock and queue of PGPROCs */
34         bool            releaseOK;              /* T if ok to release waiters */
35         char            exclusive;              /* # of exclusive holders (0 or 1) */
36         int                     shared;                 /* # of shared holders (0..MaxBackends) */
37         PGPROC     *head;                       /* head of list of waiting PGPROCs */
38         PGPROC     *tail;                       /* tail of list of waiting PGPROCs */
39         /* tail is undefined when head is NULL */
40 } LWLock;
41
42 /*
43  * This points to the array of LWLocks in shared memory.  Backends inherit
44  * the pointer by fork from the postmaster.  LWLockIds are indexes into
45  * the array.
46  */
47 NON_EXEC_STATIC LWLock *LWLockArray = NULL;
48
49 /* shared counter for dynamic allocation of LWLockIds */
50 static int *LWLockCounter;
51
52
53 /*
54  * We use this structure to keep track of locked LWLocks for release
55  * during error recovery.  The maximum size could be determined at runtime
56  * if necessary, but it seems unlikely that more than a few locks could
57  * ever be held simultaneously.
58  */
59 #define MAX_SIMUL_LWLOCKS       100
60
61 static int      num_held_lwlocks = 0;
62 static LWLockId held_lwlocks[MAX_SIMUL_LWLOCKS];
63
64
65 #ifdef LOCK_DEBUG
66 bool            Trace_lwlocks = false;
67
68 inline static void
69 PRINT_LWDEBUG(const char *where, LWLockId lockid, const volatile LWLock *lock)
70 {
71         if (Trace_lwlocks)
72                 elog(LOG, "%s(%d): excl %d shared %d head %p rOK %d",
73                          where, (int) lockid,
74                          (int) lock->exclusive, lock->shared, lock->head,
75                          (int) lock->releaseOK);
76 }
77
78 inline static void
79 LOG_LWDEBUG(const char *where, LWLockId lockid, const char *msg)
80 {
81         if (Trace_lwlocks)
82                 elog(LOG, "%s(%d): %s", where, (int) lockid, msg);
83 }
84
85 #else                                                   /* not LOCK_DEBUG */
86 #define PRINT_LWDEBUG(a,b,c)
87 #define LOG_LWDEBUG(a,b,c)
88 #endif   /* LOCK_DEBUG */
89
90
91 /*
92  * Compute number of LWLocks to allocate.
93  */
94 int
95 NumLWLocks(void)
96 {
97         int                     numLocks;
98
99         /*
100          * Possibly this logic should be spread out among the affected
101          * modules, the same way that shmem space estimation is done.  But for
102          * now, there are few enough users of LWLocks that we can get away
103          * with just keeping the knowledge here.
104          */
105
106         /* Predefined LWLocks */
107         numLocks = (int) NumFixedLWLocks;
108
109         /* bufmgr.c needs two for each shared buffer */
110         numLocks += 2 * NBuffers;
111
112         /* clog.c needs one per CLOG buffer + one control lock */
113         numLocks += NUM_CLOG_BUFFERS + 1;
114
115         /* subtrans.c needs one per SubTrans buffer + one control lock */
116         numLocks += NUM_SUBTRANS_BUFFERS + 1;
117
118         /* Perhaps create a few more for use by user-defined modules? */
119
120         return numLocks;
121 }
122
123
124 /*
125  * Compute shmem space needed for LWLocks.
126  */
127 int
128 LWLockShmemSize(void)
129 {
130         int                     numLocks = NumLWLocks();
131         uint32          spaceLocks;
132
133         /* Allocate the LWLocks plus space for shared allocation counter. */
134         spaceLocks = numLocks * sizeof(LWLock) + 2 * sizeof(int);
135         spaceLocks = MAXALIGN(spaceLocks);
136
137         return (int) spaceLocks;
138 }
139
140
141 /*
142  * Allocate shmem space for LWLocks and initialize the locks.
143  */
144 void
145 CreateLWLocks(void)
146 {
147         int                     numLocks = NumLWLocks();
148         uint32          spaceLocks = LWLockShmemSize();
149         LWLock     *lock;
150         int                     id;
151
152         /* Allocate space */
153         LWLockArray = (LWLock *) ShmemAlloc(spaceLocks);
154
155         /*
156          * Initialize all LWLocks to "unlocked" state
157          */
158         for (id = 0, lock = LWLockArray; id < numLocks; id++, lock++)
159         {
160                 SpinLockInit(&lock->mutex);
161                 lock->releaseOK = true;
162                 lock->exclusive = 0;
163                 lock->shared = 0;
164                 lock->head = NULL;
165                 lock->tail = NULL;
166         }
167
168         /*
169          * Initialize the dynamic-allocation counter at the end of the array
170          */
171         LWLockCounter = (int *) lock;
172         LWLockCounter[0] = (int) NumFixedLWLocks;
173         LWLockCounter[1] = numLocks;
174 }
175
176
177 /*
178  * LWLockAssign - assign a dynamically-allocated LWLock number
179  *
180  * NB: we do not currently try to interlock this.  Could perhaps use
181  * ShmemLock spinlock if there were any need to assign LWLockIds after
182  * shmem setup.
183  */
184 LWLockId
185 LWLockAssign(void)
186 {
187         if (LWLockCounter[0] >= LWLockCounter[1])
188                 elog(FATAL, "no more LWLockIds available");
189         return (LWLockId) (LWLockCounter[0]++);
190 }
191
192
193 /*
194  * LWLockAcquire - acquire a lightweight lock in the specified mode
195  *
196  * If the lock is not available, sleep until it is.
197  *
198  * Side effect: cancel/die interrupts are held off until lock release.
199  */
200 void
201 LWLockAcquire(LWLockId lockid, LWLockMode mode)
202 {
203         volatile LWLock *lock = LWLockArray + lockid;
204         PGPROC     *proc = MyProc;
205         bool            retry = false;
206         int                     extraWaits = 0;
207
208         PRINT_LWDEBUG("LWLockAcquire", lockid, lock);
209
210         /*
211          * We can't wait if we haven't got a PGPROC.  This should only occur
212          * during bootstrap or shared memory initialization.  Put an Assert
213          * here to catch unsafe coding practices.
214          */
215         Assert(!(proc == NULL && IsUnderPostmaster));
216
217         /*
218          * Lock out cancel/die interrupts until we exit the code section
219          * protected by the LWLock.  This ensures that interrupts will not
220          * interfere with manipulations of data structures in shared memory.
221          */
222         HOLD_INTERRUPTS();
223
224         /*
225          * Loop here to try to acquire lock after each time we are signaled by
226          * LWLockRelease.
227          *
228          * NOTE: it might seem better to have LWLockRelease actually grant us the
229          * lock, rather than retrying and possibly having to go back to sleep.
230          * But in practice that is no good because it means a process swap for
231          * every lock acquisition when two or more processes are contending
232          * for the same lock.  Since LWLocks are normally used to protect
233          * not-very-long sections of computation, a process needs to be able
234          * to acquire and release the same lock many times during a single CPU
235          * time slice, even in the presence of contention.      The efficiency of
236          * being able to do that outweighs the inefficiency of sometimes
237          * wasting a process dispatch cycle because the lock is not free when
238          * a released waiter finally gets to run.  See pgsql-hackers archives
239          * for 29-Dec-01.
240          */
241         for (;;)
242         {
243                 bool            mustwait;
244
245                 /* Acquire mutex.  Time spent holding mutex should be short! */
246                 SpinLockAcquire_NoHoldoff(&lock->mutex);
247
248                 /* If retrying, allow LWLockRelease to release waiters again */
249                 if (retry)
250                         lock->releaseOK = true;
251
252                 /* If I can get the lock, do so quickly. */
253                 if (mode == LW_EXCLUSIVE)
254                 {
255                         if (lock->exclusive == 0 && lock->shared == 0)
256                         {
257                                 lock->exclusive++;
258                                 mustwait = false;
259                         }
260                         else
261                                 mustwait = true;
262                 }
263                 else
264                 {
265                         if (lock->exclusive == 0)
266                         {
267                                 lock->shared++;
268                                 mustwait = false;
269                         }
270                         else
271                                 mustwait = true;
272                 }
273
274                 if (!mustwait)
275                         break;                          /* got the lock */
276
277                 /*
278                  * Add myself to wait queue.
279                  *
280                  * If we don't have a PGPROC structure, there's no way to wait. This
281                  * should never occur, since MyProc should only be null during
282                  * shared memory initialization.
283                  */
284                 if (proc == NULL)
285                         elog(FATAL, "cannot wait without a PGPROC structure");
286
287                 proc->lwWaiting = true;
288                 proc->lwExclusive = (mode == LW_EXCLUSIVE);
289                 proc->lwWaitLink = NULL;
290                 if (lock->head == NULL)
291                         lock->head = proc;
292                 else
293                         lock->tail->lwWaitLink = proc;
294                 lock->tail = proc;
295
296                 /* Can release the mutex now */
297                 SpinLockRelease_NoHoldoff(&lock->mutex);
298
299                 /*
300                  * Wait until awakened.
301                  *
302                  * Since we share the process wait semaphore with the regular lock
303                  * manager and ProcWaitForSignal, and we may need to acquire an
304                  * LWLock while one of those is pending, it is possible that we
305                  * get awakened for a reason other than being signaled by
306                  * LWLockRelease. If so, loop back and wait again.      Once we've
307                  * gotten the LWLock, re-increment the sema by the number of
308                  * additional signals received, so that the lock manager or signal
309                  * manager will see the received signal when it next waits.
310                  */
311                 LOG_LWDEBUG("LWLockAcquire", lockid, "waiting");
312
313                 for (;;)
314                 {
315                         /* "false" means cannot accept cancel/die interrupt here. */
316                         PGSemaphoreLock(&proc->sem, false);
317                         if (!proc->lwWaiting)
318                                 break;
319                         extraWaits++;
320                 }
321
322                 LOG_LWDEBUG("LWLockAcquire", lockid, "awakened");
323
324                 /* Now loop back and try to acquire lock again. */
325                 retry = true;
326         }
327
328         /* We are done updating shared state of the lock itself. */
329         SpinLockRelease_NoHoldoff(&lock->mutex);
330
331         /* Add lock to list of locks held by this backend */
332         Assert(num_held_lwlocks < MAX_SIMUL_LWLOCKS);
333         held_lwlocks[num_held_lwlocks++] = lockid;
334
335         /*
336          * Fix the process wait semaphore's count for any absorbed wakeups.
337          */
338         while (extraWaits-- > 0)
339                 PGSemaphoreUnlock(&proc->sem);
340 }
341
342 /*
343  * LWLockConditionalAcquire - acquire a lightweight lock in the specified mode
344  *
345  * If the lock is not available, return FALSE with no side-effects.
346  *
347  * If successful, cancel/die interrupts are held off until lock release.
348  */
349 bool
350 LWLockConditionalAcquire(LWLockId lockid, LWLockMode mode)
351 {
352         volatile LWLock *lock = LWLockArray + lockid;
353         bool            mustwait;
354
355         PRINT_LWDEBUG("LWLockConditionalAcquire", lockid, lock);
356
357         /*
358          * Lock out cancel/die interrupts until we exit the code section
359          * protected by the LWLock.  This ensures that interrupts will not
360          * interfere with manipulations of data structures in shared memory.
361          */
362         HOLD_INTERRUPTS();
363
364         /* Acquire mutex.  Time spent holding mutex should be short! */
365         SpinLockAcquire_NoHoldoff(&lock->mutex);
366
367         /* If I can get the lock, do so quickly. */
368         if (mode == LW_EXCLUSIVE)
369         {
370                 if (lock->exclusive == 0 && lock->shared == 0)
371                 {
372                         lock->exclusive++;
373                         mustwait = false;
374                 }
375                 else
376                         mustwait = true;
377         }
378         else
379         {
380                 if (lock->exclusive == 0)
381                 {
382                         lock->shared++;
383                         mustwait = false;
384                 }
385                 else
386                         mustwait = true;
387         }
388
389         /* We are done updating shared state of the lock itself. */
390         SpinLockRelease_NoHoldoff(&lock->mutex);
391
392         if (mustwait)
393         {
394                 /* Failed to get lock, so release interrupt holdoff */
395                 RESUME_INTERRUPTS();
396                 LOG_LWDEBUG("LWLockConditionalAcquire", lockid, "failed");
397         }
398         else
399         {
400                 /* Add lock to list of locks held by this backend */
401                 Assert(num_held_lwlocks < MAX_SIMUL_LWLOCKS);
402                 held_lwlocks[num_held_lwlocks++] = lockid;
403         }
404
405         return !mustwait;
406 }
407
408 /*
409  * LWLockRelease - release a previously acquired lock
410  */
411 void
412 LWLockRelease(LWLockId lockid)
413 {
414         volatile LWLock *lock = LWLockArray + lockid;
415         PGPROC     *head;
416         PGPROC     *proc;
417         int                     i;
418
419         PRINT_LWDEBUG("LWLockRelease", lockid, lock);
420
421         /*
422          * Remove lock from list of locks held.  Usually, but not always, it
423          * will be the latest-acquired lock; so search array backwards.
424          */
425         for (i = num_held_lwlocks; --i >= 0;)
426         {
427                 if (lockid == held_lwlocks[i])
428                         break;
429         }
430         if (i < 0)
431                 elog(ERROR, "lock %d is not held", (int) lockid);
432         num_held_lwlocks--;
433         for (; i < num_held_lwlocks; i++)
434                 held_lwlocks[i] = held_lwlocks[i + 1];
435
436         /* Acquire mutex.  Time spent holding mutex should be short! */
437         SpinLockAcquire_NoHoldoff(&lock->mutex);
438
439         /* Release my hold on lock */
440         if (lock->exclusive > 0)
441                 lock->exclusive--;
442         else
443         {
444                 Assert(lock->shared > 0);
445                 lock->shared--;
446         }
447
448         /*
449          * See if I need to awaken any waiters.  If I released a non-last
450          * shared hold, there cannot be anything to do.  Also, do not awaken
451          * any waiters if someone has already awakened waiters that haven't
452          * yet acquired the lock.
453          */
454         head = lock->head;
455         if (head != NULL)
456         {
457                 if (lock->exclusive == 0 && lock->shared == 0 && lock->releaseOK)
458                 {
459                         /*
460                          * Remove the to-be-awakened PGPROCs from the queue.  If the
461                          * front waiter wants exclusive lock, awaken him only.
462                          * Otherwise awaken as many waiters as want shared access.
463                          */
464                         proc = head;
465                         if (!proc->lwExclusive)
466                         {
467                                 while (proc->lwWaitLink != NULL &&
468                                            !proc->lwWaitLink->lwExclusive)
469                                         proc = proc->lwWaitLink;
470                         }
471                         /* proc is now the last PGPROC to be released */
472                         lock->head = proc->lwWaitLink;
473                         proc->lwWaitLink = NULL;
474                         /* prevent additional wakeups until retryer gets to run */
475                         lock->releaseOK = false;
476                 }
477                 else
478                 {
479                         /* lock is still held, can't awaken anything */
480                         head = NULL;
481                 }
482         }
483
484         /* We are done updating shared state of the lock itself. */
485         SpinLockRelease_NoHoldoff(&lock->mutex);
486
487         /*
488          * Awaken any waiters I removed from the queue.
489          */
490         while (head != NULL)
491         {
492                 LOG_LWDEBUG("LWLockRelease", lockid, "release waiter");
493                 proc = head;
494                 head = proc->lwWaitLink;
495                 proc->lwWaitLink = NULL;
496                 proc->lwWaiting = false;
497                 PGSemaphoreUnlock(&proc->sem);
498         }
499
500         /*
501          * Now okay to allow cancel/die interrupts.
502          */
503         RESUME_INTERRUPTS();
504 }
505
506
507 /*
508  * LWLockReleaseAll - release all currently-held locks
509  *
510  * Used to clean up after ereport(ERROR). An important difference between this
511  * function and retail LWLockRelease calls is that InterruptHoldoffCount is
512  * unchanged by this operation.  This is necessary since InterruptHoldoffCount
513  * has been set to an appropriate level earlier in error recovery. We could
514  * decrement it below zero if we allow it to drop for each released lock!
515  */
516 void
517 LWLockReleaseAll(void)
518 {
519         while (num_held_lwlocks > 0)
520         {
521                 HOLD_INTERRUPTS();              /* match the upcoming RESUME_INTERRUPTS */
522
523                 LWLockRelease(held_lwlocks[num_held_lwlocks - 1]);
524         }
525 }
526
527
528 /*
529  * LWLockHeldByMe - test whether my process currently holds a lock
530  *
531  * This is meant as debug support only.  We do not distinguish whether the
532  * lock is held shared or exclusive.
533  */
534 bool
535 LWLockHeldByMe(LWLockId lockid)
536 {
537         int     i;
538
539         for (i = 0; i < num_held_lwlocks; i++)
540         {
541                 if (held_lwlocks[i] == lockid)
542                         return true;
543         }
544         return false;
545 }