]> granicus.if.org Git - postgresql/blob - src/backend/storage/lmgr/lwlock.c
New pgindent run with fixes suggested by Tom. Patch manually reviewed,
[postgresql] / src / backend / storage / lmgr / lwlock.c
1 /*-------------------------------------------------------------------------
2  *
3  * lwlock.c
4  *        Lightweight lock manager
5  *
6  * Lightweight locks are intended primarily to provide mutual exclusion of
7  * access to shared-memory data structures.  Therefore, they offer both
8  * exclusive and shared lock modes (to support read/write and read-only
9  * access to a shared object).  There are few other frammishes.  User-level
10  * locking should be done with the full lock manager --- which depends on
11  * an LWLock to protect its shared state.
12  *
13  *
14  * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
15  * Portions Copyright (c) 1994, Regents of the University of California
16  *
17  * IDENTIFICATION
18  *        $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lwlock.c,v 1.3 2001/11/05 17:46:28 momjian Exp $
19  *
20  *-------------------------------------------------------------------------
21  */
22 #include "postgres.h"
23
24 #include "access/clog.h"
25 #include "storage/lwlock.h"
26 #include "storage/proc.h"
27 #include "storage/spin.h"
28
29
30 typedef struct LWLock
31 {
32         slock_t         mutex;                  /* Protects LWLock and queue of PROCs */
33         char            exclusive;              /* # of exclusive holders (0 or 1) */
34         int                     shared;                 /* # of shared holders (0..MaxBackends) */
35         PROC       *head;                       /* head of list of waiting PROCs */
36         PROC       *tail;                       /* tail of list of waiting PROCs */
37         /* tail is undefined when head is NULL */
38 } LWLock;
39
40 /*
41  * This points to the array of LWLocks in shared memory.  Backends inherit
42  * the pointer by fork from the postmaster.  LWLockIds are indexes into
43  * the array.
44  */
45 static LWLock *LWLockArray = NULL;
46
47 /* shared counter for dynamic allocation of LWLockIds */
48 static int *LWLockCounter;
49
50
51 /*
52  * We use this structure to keep track of locked LWLocks for release
53  * during error recovery.  The maximum size could be determined at runtime
54  * if necessary, but it seems unlikely that more than a few locks could
55  * ever be held simultaneously.
56  */
57 #define MAX_SIMUL_LWLOCKS       100
58
59 static int      num_held_lwlocks = 0;
60 static LWLockId held_lwlocks[MAX_SIMUL_LWLOCKS];
61
62
63 #ifdef LOCK_DEBUG
64 bool            Trace_lwlocks = false;
65
66 inline static void
67 PRINT_LWDEBUG(const char *where, LWLockId lockid, const LWLock *lock)
68 {
69         if (Trace_lwlocks)
70                 elog(DEBUG, "%s(%d): excl %d shared %d head %p",
71                          where, (int) lockid,
72                          (int) lock->exclusive, lock->shared, lock->head);
73 }
74
75 #else                                                   /* not LOCK_DEBUG */
76 #define PRINT_LWDEBUG(a,b,c)
77 #endif   /* LOCK_DEBUG */
78
79
80 /*
81  * Compute number of LWLocks to allocate.
82  */
83 int
84 NumLWLocks(void)
85 {
86         int                     numLocks;
87
88         /*
89          * Possibly this logic should be spread out among the affected
90          * modules, the same way that shmem space estimation is done.  But for
91          * now, there are few enough users of LWLocks that we can get away
92          * with just keeping the knowledge here.
93          */
94
95         /* Predefined LWLocks */
96         numLocks = (int) NumFixedLWLocks;
97
98         /* bufmgr.c needs two for each shared buffer */
99         numLocks += 2 * NBuffers;
100
101         /* clog.c needs one per CLOG buffer */
102         numLocks += NUM_CLOG_BUFFERS;
103
104         /* Perhaps create a few more for use by user-defined modules? */
105
106         return numLocks;
107 }
108
109
110 /*
111  * Compute shmem space needed for LWLocks.
112  */
113 int
114 LWLockShmemSize(void)
115 {
116         int                     numLocks = NumLWLocks();
117         uint32          spaceLocks;
118
119         /* Allocate the LWLocks plus space for shared allocation counter. */
120         spaceLocks = numLocks * sizeof(LWLock) + 2 * sizeof(int);
121         spaceLocks = MAXALIGN(spaceLocks);
122
123         return (int) spaceLocks;
124 }
125
126
127 /*
128  * Allocate shmem space for LWLocks and initialize the locks.
129  */
130 void
131 CreateLWLocks(void)
132 {
133         int                     numLocks = NumLWLocks();
134         uint32          spaceLocks = LWLockShmemSize();
135         LWLock     *lock;
136         int                     id;
137
138         /* Allocate space */
139         LWLockArray = (LWLock *) ShmemAlloc(spaceLocks);
140
141         /*
142          * Initialize all LWLocks to "unlocked" state
143          */
144         for (id = 0, lock = LWLockArray; id < numLocks; id++, lock++)
145         {
146                 SpinLockInit(&lock->mutex);
147                 lock->exclusive = 0;
148                 lock->shared = 0;
149                 lock->head = NULL;
150                 lock->tail = NULL;
151         }
152
153         /*
154          * Initialize the dynamic-allocation counter at the end of the array
155          */
156         LWLockCounter = (int *) lock;
157         LWLockCounter[0] = (int) NumFixedLWLocks;
158         LWLockCounter[1] = numLocks;
159 }
160
161
162 /*
163  * LWLockAssign - assign a dynamically-allocated LWLock number
164  *
165  * NB: we do not currently try to interlock this.  Could perhaps use
166  * ShmemLock spinlock if there were any need to assign LWLockIds after
167  * shmem setup.
168  */
169 LWLockId
170 LWLockAssign(void)
171 {
172         if (LWLockCounter[0] >= LWLockCounter[1])
173                 elog(FATAL, "No more LWLockIds available");
174         return (LWLockId) (LWLockCounter[0]++);
175 }
176
177
178 /*
179  * LWLockAcquire - acquire a lightweight lock in the specified mode
180  *
181  * If the lock is not available, sleep until it is.
182  *
183  * Side effect: cancel/die interrupts are held off until lock release.
184  */
185 void
186 LWLockAcquire(LWLockId lockid, LWLockMode mode)
187 {
188         LWLock     *lock = LWLockArray + lockid;
189         bool            mustwait;
190
191         PRINT_LWDEBUG("LWLockAcquire", lockid, lock);
192
193         /*
194          * Lock out cancel/die interrupts until we exit the code section
195          * protected by the LWLock.  This ensures that interrupts will not
196          * interfere with manipulations of data structures in shared memory.
197          */
198         HOLD_INTERRUPTS();
199
200         /* Acquire mutex.  Time spent holding mutex should be short! */
201         SpinLockAcquire_NoHoldoff(&lock->mutex);
202
203         /* If I can get the lock, do so quickly. */
204         if (mode == LW_EXCLUSIVE)
205         {
206                 if (lock->exclusive == 0 && lock->shared == 0)
207                 {
208                         lock->exclusive++;
209                         mustwait = false;
210                 }
211                 else
212                         mustwait = true;
213         }
214         else
215         {
216                 /*
217                  * If there is someone waiting (presumably for exclusive access),
218                  * queue up behind him even though I could get the lock.  This
219                  * prevents a stream of read locks from starving a writer.
220                  */
221                 if (lock->exclusive == 0 && lock->head == NULL)
222                 {
223                         lock->shared++;
224                         mustwait = false;
225                 }
226                 else
227                         mustwait = true;
228         }
229
230         if (mustwait)
231         {
232                 /* Add myself to wait queue */
233                 PROC       *proc = MyProc;
234                 int                     extraWaits = 0;
235
236                 /*
237                  * If we don't have a PROC structure, there's no way to wait. This
238                  * should never occur, since MyProc should only be null during
239                  * shared memory initialization.
240                  */
241                 if (proc == NULL)
242                         elog(FATAL, "LWLockAcquire: can't wait without a PROC structure");
243
244                 proc->lwWaiting = true;
245                 proc->lwExclusive = (mode == LW_EXCLUSIVE);
246                 proc->lwWaitLink = NULL;
247                 if (lock->head == NULL)
248                         lock->head = proc;
249                 else
250                         lock->tail->lwWaitLink = proc;
251                 lock->tail = proc;
252
253                 /* Can release the mutex now */
254                 SpinLockRelease_NoHoldoff(&lock->mutex);
255
256                 /*
257                  * Wait until awakened.
258                  *
259                  * Since we share the process wait semaphore with the regular lock
260                  * manager and ProcWaitForSignal, and we may need to acquire an
261                  * LWLock while one of those is pending, it is possible that we
262                  * get awakened for a reason other than being granted the LWLock.
263                  * If so, loop back and wait again.  Once we've gotten the lock,
264                  * re-increment the sema by the number of additional signals
265                  * received, so that the lock manager or signal manager will see
266                  * the received signal when it next waits.
267                  */
268                 for (;;)
269                 {
270                         /* "false" means cannot accept cancel/die interrupt here. */
271                         IpcSemaphoreLock(proc->sem.semId, proc->sem.semNum, false);
272                         if (!proc->lwWaiting)
273                                 break;
274                         extraWaits++;
275                 }
276
277                 /*
278                  * The awakener already updated the lock struct's state, so we
279                  * don't need to do anything more to it.  Just need to fix the
280                  * semaphore count.
281                  */
282                 while (extraWaits-- > 0)
283                         IpcSemaphoreUnlock(proc->sem.semId, proc->sem.semNum);
284         }
285         else
286         {
287                 /* Got the lock without waiting */
288                 SpinLockRelease_NoHoldoff(&lock->mutex);
289         }
290
291         /* Add lock to list of locks held by this backend */
292         Assert(num_held_lwlocks < MAX_SIMUL_LWLOCKS);
293         held_lwlocks[num_held_lwlocks++] = lockid;
294 }
295
296 /*
297  * LWLockConditionalAcquire - acquire a lightweight lock in the specified mode
298  *
299  * If the lock is not available, return FALSE with no side-effects.
300  *
301  * If successful, cancel/die interrupts are held off until lock release.
302  */
303 bool
304 LWLockConditionalAcquire(LWLockId lockid, LWLockMode mode)
305 {
306         LWLock     *lock = LWLockArray + lockid;
307         bool            mustwait;
308
309         PRINT_LWDEBUG("LWLockConditionalAcquire", lockid, lock);
310
311         /*
312          * Lock out cancel/die interrupts until we exit the code section
313          * protected by the LWLock.  This ensures that interrupts will not
314          * interfere with manipulations of data structures in shared memory.
315          */
316         HOLD_INTERRUPTS();
317
318         /* Acquire mutex.  Time spent holding mutex should be short! */
319         SpinLockAcquire_NoHoldoff(&lock->mutex);
320
321         /* If I can get the lock, do so quickly. */
322         if (mode == LW_EXCLUSIVE)
323         {
324                 if (lock->exclusive == 0 && lock->shared == 0)
325                 {
326                         lock->exclusive++;
327                         mustwait = false;
328                 }
329                 else
330                         mustwait = true;
331         }
332         else
333         {
334                 /*
335                  * If there is someone waiting (presumably for exclusive access),
336                  * queue up behind him even though I could get the lock.  This
337                  * prevents a stream of read locks from starving a writer.
338                  */
339                 if (lock->exclusive == 0 && lock->head == NULL)
340                 {
341                         lock->shared++;
342                         mustwait = false;
343                 }
344                 else
345                         mustwait = true;
346         }
347
348         /* We are done updating shared state of the lock itself. */
349         SpinLockRelease_NoHoldoff(&lock->mutex);
350
351         if (mustwait)
352         {
353                 /* Failed to get lock, so release interrupt holdoff */
354                 RESUME_INTERRUPTS();
355         }
356         else
357         {
358                 /* Add lock to list of locks held by this backend */
359                 Assert(num_held_lwlocks < MAX_SIMUL_LWLOCKS);
360                 held_lwlocks[num_held_lwlocks++] = lockid;
361         }
362
363         return !mustwait;
364 }
365
366 /*
367  * LWLockRelease - release a previously acquired lock
368  */
369 void
370 LWLockRelease(LWLockId lockid)
371 {
372         LWLock     *lock = LWLockArray + lockid;
373         PROC       *head;
374         PROC       *proc;
375         int                     i;
376
377         PRINT_LWDEBUG("LWLockRelease", lockid, lock);
378
379         /*
380          * Remove lock from list of locks held.  Usually, but not always, it
381          * will be the latest-acquired lock; so search array backwards.
382          */
383         for (i = num_held_lwlocks; --i >= 0;)
384         {
385                 if (lockid == held_lwlocks[i])
386                         break;
387         }
388         if (i < 0)
389                 elog(ERROR, "LWLockRelease: lock %d is not held", (int) lockid);
390         num_held_lwlocks--;
391         for (; i < num_held_lwlocks; i++)
392                 held_lwlocks[i] = held_lwlocks[i + 1];
393
394         /* Acquire mutex.  Time spent holding mutex should be short! */
395         SpinLockAcquire_NoHoldoff(&lock->mutex);
396
397         /* Release my hold on lock */
398         if (lock->exclusive > 0)
399                 lock->exclusive--;
400         else
401         {
402                 Assert(lock->shared > 0);
403                 lock->shared--;
404         }
405
406         /*
407          * See if I need to awaken any waiters.  If I released a non-last
408          * shared hold, there cannot be anything to do.
409          */
410         head = lock->head;
411         if (head != NULL)
412         {
413                 if (lock->exclusive == 0 && lock->shared == 0)
414                 {
415                         /*
416                          * Remove the to-be-awakened PROCs from the queue, and update
417                          * the lock state to show them as holding the lock.
418                          */
419                         proc = head;
420                         if (proc->lwExclusive)
421                                 lock->exclusive++;
422                         else
423                         {
424                                 lock->shared++;
425                                 while (proc->lwWaitLink != NULL &&
426                                            !proc->lwWaitLink->lwExclusive)
427                                 {
428                                         proc = proc->lwWaitLink;
429                                         lock->shared++;
430                                 }
431                         }
432                         /* proc is now the last PROC to be released */
433                         lock->head = proc->lwWaitLink;
434                         proc->lwWaitLink = NULL;
435                 }
436                 else
437                 {
438                         /* lock is still held, can't awaken anything */
439                         head = NULL;
440                 }
441         }
442
443         /* We are done updating shared state of the lock itself. */
444         SpinLockRelease_NoHoldoff(&lock->mutex);
445
446         /*
447          * Awaken any waiters I removed from the queue.
448          */
449         while (head != NULL)
450         {
451                 proc = head;
452                 head = proc->lwWaitLink;
453                 proc->lwWaitLink = NULL;
454                 proc->lwWaiting = false;
455                 IpcSemaphoreUnlock(proc->sem.semId, proc->sem.semNum);
456         }
457
458         /*
459          * Now okay to allow cancel/die interrupts.
460          */
461         RESUME_INTERRUPTS();
462 }
463
464
465 /*
466  * LWLockReleaseAll - release all currently-held locks
467  *
468  * Used to clean up after elog(ERROR).  An important difference between this
469  * function and retail LWLockRelease calls is that InterruptHoldoffCount is
470  * unchanged by this operation.  This is necessary since InterruptHoldoffCount
471  * has been set to an appropriate level earlier in error recovery.      We could
472  * decrement it below zero if we allow it to drop for each released lock!
473  */
474 void
475 LWLockReleaseAll(void)
476 {
477         while (num_held_lwlocks > 0)
478         {
479                 HOLD_INTERRUPTS();              /* match the upcoming RESUME_INTERRUPTS */
480
481                 LWLockRelease(held_lwlocks[num_held_lwlocks - 1]);
482         }
483 }