]> granicus.if.org Git - postgresql/blob - src/backend/storage/lmgr/lwlock.c
Create an internal semaphore API that is not tied to SysV semaphores.
[postgresql] / src / backend / storage / lmgr / lwlock.c
1 /*-------------------------------------------------------------------------
2  *
3  * lwlock.c
4  *        Lightweight lock manager
5  *
6  * Lightweight locks are intended primarily to provide mutual exclusion of
7  * access to shared-memory data structures.  Therefore, they offer both
8  * exclusive and shared lock modes (to support read/write and read-only
9  * access to a shared object).  There are few other frammishes.  User-level
10  * locking should be done with the full lock manager --- which depends on
11  * an LWLock to protect its shared state.
12  *
13  *
14  * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
15  * Portions Copyright (c) 1994, Regents of the University of California
16  *
17  * IDENTIFICATION
18  *        $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lwlock.c,v 1.10 2002/05/05 00:03:28 tgl Exp $
19  *
20  *-------------------------------------------------------------------------
21  */
22 #include "postgres.h"
23
24 #include "access/clog.h"
25 #include "storage/lwlock.h"
26 #include "storage/proc.h"
27 #include "storage/spin.h"
28
29
30 typedef struct LWLock
31 {
32         slock_t         mutex;                  /* Protects LWLock and queue of PROCs */
33         bool            releaseOK;              /* T if ok to release waiters */
34         char            exclusive;              /* # of exclusive holders (0 or 1) */
35         int                     shared;                 /* # of shared holders (0..MaxBackends) */
36         PROC       *head;                       /* head of list of waiting PROCs */
37         PROC       *tail;                       /* tail of list of waiting PROCs */
38         /* tail is undefined when head is NULL */
39 } LWLock;
40
41 /*
42  * This points to the array of LWLocks in shared memory.  Backends inherit
43  * the pointer by fork from the postmaster.  LWLockIds are indexes into
44  * the array.
45  */
46 static LWLock *LWLockArray = NULL;
47
48 /* shared counter for dynamic allocation of LWLockIds */
49 static int *LWLockCounter;
50
51
52 /*
53  * We use this structure to keep track of locked LWLocks for release
54  * during error recovery.  The maximum size could be determined at runtime
55  * if necessary, but it seems unlikely that more than a few locks could
56  * ever be held simultaneously.
57  */
58 #define MAX_SIMUL_LWLOCKS       100
59
60 static int      num_held_lwlocks = 0;
61 static LWLockId held_lwlocks[MAX_SIMUL_LWLOCKS];
62
63
64 #ifdef LOCK_DEBUG
65 bool            Trace_lwlocks = false;
66
67 inline static void
68 PRINT_LWDEBUG(const char *where, LWLockId lockid, const volatile LWLock *lock)
69 {
70         if (Trace_lwlocks)
71                 elog(LOG, "%s(%d): excl %d shared %d head %p rOK %d",
72                          where, (int) lockid,
73                          (int) lock->exclusive, lock->shared, lock->head,
74                          (int) lock->releaseOK);
75 }
76
77 inline static void
78 LOG_LWDEBUG(const char *where, LWLockId lockid, const char *msg)
79 {
80         if (Trace_lwlocks)
81                 elog(LOG, "%s(%d): %s", where, (int) lockid, msg);
82 }
83
84 #else                                                   /* not LOCK_DEBUG */
85 #define PRINT_LWDEBUG(a,b,c)
86 #define LOG_LWDEBUG(a,b,c)
87 #endif   /* LOCK_DEBUG */
88
89
90 /*
91  * Compute number of LWLocks to allocate.
92  */
93 int
94 NumLWLocks(void)
95 {
96         int                     numLocks;
97
98         /*
99          * Possibly this logic should be spread out among the affected
100          * modules, the same way that shmem space estimation is done.  But for
101          * now, there are few enough users of LWLocks that we can get away
102          * with just keeping the knowledge here.
103          */
104
105         /* Predefined LWLocks */
106         numLocks = (int) NumFixedLWLocks;
107
108         /* bufmgr.c needs two for each shared buffer */
109         numLocks += 2 * NBuffers;
110
111         /* clog.c needs one per CLOG buffer */
112         numLocks += NUM_CLOG_BUFFERS;
113
114         /* Perhaps create a few more for use by user-defined modules? */
115
116         return numLocks;
117 }
118
119
120 /*
121  * Compute shmem space needed for LWLocks.
122  */
123 int
124 LWLockShmemSize(void)
125 {
126         int                     numLocks = NumLWLocks();
127         uint32          spaceLocks;
128
129         /* Allocate the LWLocks plus space for shared allocation counter. */
130         spaceLocks = numLocks * sizeof(LWLock) + 2 * sizeof(int);
131         spaceLocks = MAXALIGN(spaceLocks);
132
133         return (int) spaceLocks;
134 }
135
136
137 /*
138  * Allocate shmem space for LWLocks and initialize the locks.
139  */
140 void
141 CreateLWLocks(void)
142 {
143         int                     numLocks = NumLWLocks();
144         uint32          spaceLocks = LWLockShmemSize();
145         LWLock     *lock;
146         int                     id;
147
148         /* Allocate space */
149         LWLockArray = (LWLock *) ShmemAlloc(spaceLocks);
150
151         /*
152          * Initialize all LWLocks to "unlocked" state
153          */
154         for (id = 0, lock = LWLockArray; id < numLocks; id++, lock++)
155         {
156                 SpinLockInit(&lock->mutex);
157                 lock->releaseOK = true;
158                 lock->exclusive = 0;
159                 lock->shared = 0;
160                 lock->head = NULL;
161                 lock->tail = NULL;
162         }
163
164         /*
165          * Initialize the dynamic-allocation counter at the end of the array
166          */
167         LWLockCounter = (int *) lock;
168         LWLockCounter[0] = (int) NumFixedLWLocks;
169         LWLockCounter[1] = numLocks;
170 }
171
172
173 /*
174  * LWLockAssign - assign a dynamically-allocated LWLock number
175  *
176  * NB: we do not currently try to interlock this.  Could perhaps use
177  * ShmemLock spinlock if there were any need to assign LWLockIds after
178  * shmem setup.
179  */
180 LWLockId
181 LWLockAssign(void)
182 {
183         if (LWLockCounter[0] >= LWLockCounter[1])
184                 elog(FATAL, "No more LWLockIds available");
185         return (LWLockId) (LWLockCounter[0]++);
186 }
187
188
189 /*
190  * LWLockAcquire - acquire a lightweight lock in the specified mode
191  *
192  * If the lock is not available, sleep until it is.
193  *
194  * Side effect: cancel/die interrupts are held off until lock release.
195  */
196 void
197 LWLockAcquire(LWLockId lockid, LWLockMode mode)
198 {
199         volatile LWLock *lock = LWLockArray + lockid;
200         PROC       *proc = MyProc;
201         bool            retry = false;
202         int                     extraWaits = 0;
203
204         PRINT_LWDEBUG("LWLockAcquire", lockid, lock);
205
206         /*
207          * Lock out cancel/die interrupts until we exit the code section
208          * protected by the LWLock.  This ensures that interrupts will not
209          * interfere with manipulations of data structures in shared memory.
210          */
211         HOLD_INTERRUPTS();
212
213         /*
214          * Loop here to try to acquire lock after each time we are signaled
215          * by LWLockRelease.
216          *
217          * NOTE: it might seem better to have LWLockRelease actually grant us
218          * the lock, rather than retrying and possibly having to go back to
219          * sleep.  But in practice that is no good because it means a process
220          * swap for every lock acquisition when two or more processes are
221          * contending for the same lock.  Since LWLocks are normally used to
222          * protect not-very-long sections of computation, a process needs to
223          * be able to acquire and release the same lock many times during a
224          * single CPU time slice, even in the presence of contention.  The
225          * efficiency of being able to do that outweighs the inefficiency of
226          * sometimes wasting a process dispatch cycle because the lock is not
227          * free when a released waiter finally gets to run.  See pgsql-hackers
228          * archives for 29-Dec-01.
229          */
230         for (;;)
231         {
232                 bool            mustwait;
233
234                 /* Acquire mutex.  Time spent holding mutex should be short! */
235                 SpinLockAcquire_NoHoldoff(&lock->mutex);
236
237                 /* If retrying, allow LWLockRelease to release waiters again */
238                 if (retry)
239                         lock->releaseOK = true;
240
241                 /* If I can get the lock, do so quickly. */
242                 if (mode == LW_EXCLUSIVE)
243                 {
244                         if (lock->exclusive == 0 && lock->shared == 0)
245                         {
246                                 lock->exclusive++;
247                                 mustwait = false;
248                         }
249                         else
250                                 mustwait = true;
251                 }
252                 else
253                 {
254                         if (lock->exclusive == 0)
255                         {
256                                 lock->shared++;
257                                 mustwait = false;
258                         }
259                         else
260                                 mustwait = true;
261                 }
262
263                 if (!mustwait)
264                         break;                          /* got the lock */
265
266                 /*
267                  * Add myself to wait queue.
268                  *
269                  * If we don't have a PROC structure, there's no way to wait. This
270                  * should never occur, since MyProc should only be null during
271                  * shared memory initialization.
272                  */
273                 if (proc == NULL)
274                         elog(FATAL, "LWLockAcquire: can't wait without a PROC structure");
275
276                 proc->lwWaiting = true;
277                 proc->lwExclusive = (mode == LW_EXCLUSIVE);
278                 proc->lwWaitLink = NULL;
279                 if (lock->head == NULL)
280                         lock->head = proc;
281                 else
282                         lock->tail->lwWaitLink = proc;
283                 lock->tail = proc;
284
285                 /* Can release the mutex now */
286                 SpinLockRelease_NoHoldoff(&lock->mutex);
287
288                 /*
289                  * Wait until awakened.
290                  *
291                  * Since we share the process wait semaphore with the regular lock
292                  * manager and ProcWaitForSignal, and we may need to acquire an
293                  * LWLock while one of those is pending, it is possible that we get
294                  * awakened for a reason other than being signaled by LWLockRelease.
295                  * If so, loop back and wait again.  Once we've gotten the LWLock,
296                  * re-increment the sema by the number of additional signals
297                  * received, so that the lock manager or signal manager will see
298                  * the received signal when it next waits.
299                  */
300                 LOG_LWDEBUG("LWLockAcquire", lockid, "waiting");
301
302                 for (;;)
303                 {
304                         /* "false" means cannot accept cancel/die interrupt here. */
305                         PGSemaphoreLock(&proc->sem, false);
306                         if (!proc->lwWaiting)
307                                 break;
308                         extraWaits++;
309                 }
310
311                 LOG_LWDEBUG("LWLockAcquire", lockid, "awakened");
312
313                 /* Now loop back and try to acquire lock again. */
314                 retry = true;
315         }
316
317         /* We are done updating shared state of the lock itself. */
318         SpinLockRelease_NoHoldoff(&lock->mutex);
319
320         /* Add lock to list of locks held by this backend */
321         Assert(num_held_lwlocks < MAX_SIMUL_LWLOCKS);
322         held_lwlocks[num_held_lwlocks++] = lockid;
323
324         /*
325          * Fix the process wait semaphore's count for any absorbed wakeups.
326          */
327         while (extraWaits-- > 0)
328                 PGSemaphoreUnlock(&proc->sem);
329 }
330
331 /*
332  * LWLockConditionalAcquire - acquire a lightweight lock in the specified mode
333  *
334  * If the lock is not available, return FALSE with no side-effects.
335  *
336  * If successful, cancel/die interrupts are held off until lock release.
337  */
338 bool
339 LWLockConditionalAcquire(LWLockId lockid, LWLockMode mode)
340 {
341         volatile LWLock *lock = LWLockArray + lockid;
342         bool            mustwait;
343
344         PRINT_LWDEBUG("LWLockConditionalAcquire", lockid, lock);
345
346         /*
347          * Lock out cancel/die interrupts until we exit the code section
348          * protected by the LWLock.  This ensures that interrupts will not
349          * interfere with manipulations of data structures in shared memory.
350          */
351         HOLD_INTERRUPTS();
352
353         /* Acquire mutex.  Time spent holding mutex should be short! */
354         SpinLockAcquire_NoHoldoff(&lock->mutex);
355
356         /* If I can get the lock, do so quickly. */
357         if (mode == LW_EXCLUSIVE)
358         {
359                 if (lock->exclusive == 0 && lock->shared == 0)
360                 {
361                         lock->exclusive++;
362                         mustwait = false;
363                 }
364                 else
365                         mustwait = true;
366         }
367         else
368         {
369                 if (lock->exclusive == 0)
370                 {
371                         lock->shared++;
372                         mustwait = false;
373                 }
374                 else
375                         mustwait = true;
376         }
377
378         /* We are done updating shared state of the lock itself. */
379         SpinLockRelease_NoHoldoff(&lock->mutex);
380
381         if (mustwait)
382         {
383                 /* Failed to get lock, so release interrupt holdoff */
384                 RESUME_INTERRUPTS();
385                 LOG_LWDEBUG("LWLockConditionalAcquire", lockid, "failed");
386         }
387         else
388         {
389                 /* Add lock to list of locks held by this backend */
390                 Assert(num_held_lwlocks < MAX_SIMUL_LWLOCKS);
391                 held_lwlocks[num_held_lwlocks++] = lockid;
392         }
393
394         return !mustwait;
395 }
396
397 /*
398  * LWLockRelease - release a previously acquired lock
399  */
400 void
401 LWLockRelease(LWLockId lockid)
402 {
403         volatile LWLock *lock = LWLockArray + lockid;
404         PROC       *head;
405         PROC       *proc;
406         int                     i;
407
408         PRINT_LWDEBUG("LWLockRelease", lockid, lock);
409
410         /*
411          * Remove lock from list of locks held.  Usually, but not always, it
412          * will be the latest-acquired lock; so search array backwards.
413          */
414         for (i = num_held_lwlocks; --i >= 0;)
415         {
416                 if (lockid == held_lwlocks[i])
417                         break;
418         }
419         if (i < 0)
420                 elog(ERROR, "LWLockRelease: lock %d is not held", (int) lockid);
421         num_held_lwlocks--;
422         for (; i < num_held_lwlocks; i++)
423                 held_lwlocks[i] = held_lwlocks[i + 1];
424
425         /* Acquire mutex.  Time spent holding mutex should be short! */
426         SpinLockAcquire_NoHoldoff(&lock->mutex);
427
428         /* Release my hold on lock */
429         if (lock->exclusive > 0)
430                 lock->exclusive--;
431         else
432         {
433                 Assert(lock->shared > 0);
434                 lock->shared--;
435         }
436
437         /*
438          * See if I need to awaken any waiters.  If I released a non-last
439          * shared hold, there cannot be anything to do.  Also, do not awaken
440          * any waiters if someone has already awakened waiters that haven't
441          * yet acquired the lock.
442          */
443         head = lock->head;
444         if (head != NULL)
445         {
446                 if (lock->exclusive == 0 && lock->shared == 0 && lock->releaseOK)
447                 {
448                         /*
449                          * Remove the to-be-awakened PROCs from the queue.  If the
450                          * front waiter wants exclusive lock, awaken him only.
451                          * Otherwise awaken as many waiters as want shared access.
452                          */
453                         proc = head;
454                         if (!proc->lwExclusive)
455                         {
456                                 while (proc->lwWaitLink != NULL &&
457                                            !proc->lwWaitLink->lwExclusive)
458                                 {
459                                         proc = proc->lwWaitLink;
460                                 }
461                         }
462                         /* proc is now the last PROC to be released */
463                         lock->head = proc->lwWaitLink;
464                         proc->lwWaitLink = NULL;
465                         /* prevent additional wakeups until retryer gets to run */
466                         lock->releaseOK = false;
467                 }
468                 else
469                 {
470                         /* lock is still held, can't awaken anything */
471                         head = NULL;
472                 }
473         }
474
475         /* We are done updating shared state of the lock itself. */
476         SpinLockRelease_NoHoldoff(&lock->mutex);
477
478         /*
479          * Awaken any waiters I removed from the queue.
480          */
481         while (head != NULL)
482         {
483                 LOG_LWDEBUG("LWLockRelease", lockid, "release waiter");
484                 proc = head;
485                 head = proc->lwWaitLink;
486                 proc->lwWaitLink = NULL;
487                 proc->lwWaiting = false;
488                 PGSemaphoreUnlock(&proc->sem);
489         }
490
491         /*
492          * Now okay to allow cancel/die interrupts.
493          */
494         RESUME_INTERRUPTS();
495 }
496
497
498 /*
499  * LWLockReleaseAll - release all currently-held locks
500  *
501  * Used to clean up after elog(ERROR).  An important difference between this
502  * function and retail LWLockRelease calls is that InterruptHoldoffCount is
503  * unchanged by this operation.  This is necessary since InterruptHoldoffCount
504  * has been set to an appropriate level earlier in error recovery.      We could
505  * decrement it below zero if we allow it to drop for each released lock!
506  */
507 void
508 LWLockReleaseAll(void)
509 {
510         while (num_held_lwlocks > 0)
511         {
512                 HOLD_INTERRUPTS();              /* match the upcoming RESUME_INTERRUPTS */
513
514                 LWLockRelease(held_lwlocks[num_held_lwlocks - 1]);
515         }
516 }