]> granicus.if.org Git - postgresql/blob - src/backend/storage/lmgr/proc.c
Make functions static where possible, enclose unused functions in #ifdef NOT_USED.
[postgresql] / src / backend / storage / lmgr / proc.c
1 /*-------------------------------------------------------------------------
2  *
3  * proc.c--
4  *    routines to manage per-process shared memory data structure
5  *
6  * Copyright (c) 1994, Regents of the University of California
7  *
8  *
9  * IDENTIFICATION
10  *    $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.18 1997/08/19 21:33:29 momjian Exp $
11  *
12  *-------------------------------------------------------------------------
13  */
14 /*
15  *  Each postgres backend gets one of these.  We'll use it to
16  *  clean up after the process should the process suddenly die.
17  *
18  *
19  * Interface (a):
20  *      ProcSleep(), ProcWakeup(), ProcWakeupNext(),
21  *      ProcQueueAlloc() -- create a shm queue for sleeping processes
22  *      ProcQueueInit() -- create a queue without allocing memory
23  *
24  * Locking and waiting for buffers can cause the backend to be
25  * put to sleep.  Whoever releases the lock, etc. wakes the
26  * process up again (and gives it an error code so it knows
27  * whether it was awoken on an error condition).
28  *
29  * Interface (b):
30  *
31  * ProcReleaseLocks -- frees the locks associated with this process,
32  * ProcKill -- destroys the shared memory state (and locks)
33  *      associated with the process.
34  *
35  * 5/15/91 -- removed the buffer pool based lock chain in favor
36  *      of a shared memory lock chain.  The write-protection is
37  *      more expensive if the lock chain is in the buffer pool.
38  *      The only reason I kept the lock chain in the buffer pool
39  *      in the first place was to allow the lock table to grow larger
40  *      than available shared memory and that isn't going to work
41  *      without a lot of unimplemented support anyway.
42  *
43  * 4/7/95 -- instead of allocating a set of 1 semaphore per process, we
44  *      allocate a semaphore from a set of PROC_NSEMS_PER_SET semaphores
45  *      shared among backends (we keep a few sets of semaphores around).
46  *      This is so that we can support more backends. (system-wide semaphore
47  *      sets run out pretty fast.)                -ay 4/95
48  *
49  * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.18 1997/08/19 21:33:29 momjian Exp $
50  */
51 #include <sys/time.h>
52 #include <unistd.h>
53 #include <string.h>
54 #include <signal.h>
55 #include <sys/types.h>
56
57 #if defined(sparc_solaris)
58 #include <sys/ipc.h>
59 #include <sys/sem.h>
60 #endif
61
62 #include "postgres.h"
63 #include "miscadmin.h"
64 #include "libpq/pqsignal.h"
65
66 #include "access/xact.h"
67 #include "utils/hsearch.h"
68
69 #include "storage/ipc.h"
70 /* In Ultrix, sem.h must be included after ipc.h */
71 #include <sys/sem.h>
72 #include "storage/buf.h"        
73 #include "storage/lock.h"
74 #include "storage/lmgr.h"
75 #include "storage/shmem.h"
76 #include "storage/spin.h"
77 #include "storage/proc.h"
78
79 static void HandleDeadLock(int sig);
80 static PROC *ProcWakeup(PROC *proc, int errType);
81
82 /*
83  * timeout (in seconds) for resolving possible deadlock
84  */
85 #ifndef DEADLOCK_TIMEOUT
86 #define DEADLOCK_TIMEOUT        60
87 #endif
88
89 /* --------------------
90  * Spin lock for manipulating the shared process data structure:
91  * ProcGlobal.... Adding an extra spin lock seemed like the smallest
92  * hack to get around reading and updating this structure in shared
93  * memory. -mer 17 July 1991
94  * --------------------
95  */
96 SPINLOCK ProcStructLock;
97
98 /*
99  * For cleanup routines.  Don't cleanup if the initialization
100  * has not happened.
101  */
102 static bool     ProcInitialized = FALSE;
103
104 static PROC_HDR *ProcGlobal = NULL;
105
106 PROC    *MyProc = NULL;
107
108 static void ProcKill(int exitStatus, int pid);
109 static void ProcGetNewSemKeyAndNum(IPCKey *key, int *semNum);
110 static void ProcFreeSem(IpcSemaphoreKey semKey, int semNum);
111 /*
112  * InitProcGlobal -
113  *    initializes the global process table. We put it here so that
114  *    the postmaster can do this initialization. (ProcFreeAllSem needs
115  *    to read this table on exiting the postmaster. If we have the first
116  *    backend do this, starting up and killing the postmaster without
117  *    starting any backends will be a problem.)
118  */
119 void
120 InitProcGlobal(IPCKey key)
121 {
122     bool found = false;
123
124     /* attach to the free list */
125     ProcGlobal = (PROC_HDR *)
126         ShmemInitStruct("Proc Header",(unsigned)sizeof(PROC_HDR),&found);
127
128     /* --------------------
129      * We're the first - initialize.
130      * --------------------
131      */
132     if (! found)
133         {
134             int i;
135
136             ProcGlobal->numProcs = 0;
137             ProcGlobal->freeProcs = INVALID_OFFSET;
138             ProcGlobal->currKey = IPCGetProcessSemaphoreInitKey(key);
139             for (i=0; i < MAX_PROC_SEMS/PROC_NSEMS_PER_SET; i++)
140                 ProcGlobal->freeSemMap[i] = 0;
141         }
142 }
143
144 /* ------------------------
145  * InitProc -- create a per-process data structure for this process
146  * used by the lock manager on semaphore queues.
147  * ------------------------
148  */
149 void
150 InitProcess(IPCKey key)
151 {
152     bool found = false;
153     int pid;
154     int semstat;
155     unsigned long location, myOffset;
156     
157     /* ------------------
158      * Routine called if deadlock timer goes off. See ProcSleep()
159      * ------------------
160      */
161     pqsignal(SIGALRM, HandleDeadLock);
162
163     SpinAcquire(ProcStructLock);
164     
165     /* attach to the free list */
166     ProcGlobal = (PROC_HDR *)
167         ShmemInitStruct("Proc Header",(unsigned)sizeof(PROC_HDR),&found);
168     if (!found) {
169         /* this should not happen. InitProcGlobal() is called before this. */
170         elog(WARN, "InitProcess: Proc Header uninitialized");
171     }
172     
173     if (MyProc != NULL)
174         {
175             SpinRelease(ProcStructLock);
176             elog(WARN,"ProcInit: you already exist");
177             return;
178         }
179     
180     /* try to get a proc from the free list first */
181     
182     myOffset = ProcGlobal->freeProcs;
183     
184     if (myOffset != INVALID_OFFSET)
185         {
186             MyProc = (PROC *) MAKE_PTR(myOffset);
187             ProcGlobal->freeProcs = MyProc->links.next;
188         }
189     else
190         {
191             /* have to allocate one.  We can't use the normal binding
192              * table mechanism because the proc structure is stored
193              * by PID instead of by a global name (need to look it
194              * up by PID when we cleanup dead processes).
195              */
196             
197             MyProc = (PROC *) ShmemAlloc((unsigned)sizeof(PROC));
198             if (! MyProc)
199                 {
200                     SpinRelease(ProcStructLock);
201                     elog (FATAL,"cannot create new proc: out of memory");
202                 }
203             
204             /* this cannot be initialized until after the buffer pool */
205             SHMQueueInit(&(MyProc->lockQueue));
206             MyProc->procId = ProcGlobal->numProcs;
207             ProcGlobal->numProcs++;
208         }
209     
210     /*
211      * zero out the spin lock counts and set the sLocks field for
212      * ProcStructLock to 1 as we have acquired this spinlock above but 
213      * didn't record it since we didn't have MyProc until now.
214      */
215     memset(MyProc->sLocks, 0, sizeof(MyProc->sLocks));
216     MyProc->sLocks[ProcStructLock] = 1;
217
218
219     if (IsUnderPostmaster) {
220         IPCKey semKey;
221         int semNum;
222         int semId;
223         union semun semun;
224
225         ProcGetNewSemKeyAndNum(&semKey, &semNum);
226         
227         semId = IpcSemaphoreCreate(semKey,
228                                    PROC_NSEMS_PER_SET,
229                                    IPCProtection,
230                                    IpcSemaphoreDefaultStartValue,
231                                    0,
232                                    &semstat);
233         /*
234          * we might be reusing a semaphore that belongs to a dead
235          * backend. So be careful and reinitialize its value here.
236          */
237         semun.val = IpcSemaphoreDefaultStartValue;
238         semctl(semId, semNum, SETVAL, semun);
239
240         IpcSemaphoreLock(semId, semNum, IpcExclusiveLock);
241         MyProc->sem.semId = semId;
242         MyProc->sem.semNum = semNum;
243         MyProc->sem.semKey = semKey;
244     } else {
245         MyProc->sem.semId = -1;
246     }
247     
248     /* ----------------------
249      * Release the lock.
250      * ----------------------
251      */
252     SpinRelease(ProcStructLock);
253     
254     MyProc->pid = 0;
255     MyProc->xid = InvalidTransactionId;
256 #if 0
257     MyProc->pid = MyPid;
258 #endif
259     
260     /* ----------------
261      * Start keeping spin lock stats from here on.  Any botch before
262      * this initialization is forever botched
263      * ----------------
264      */
265     memset(MyProc->sLocks, 0, MAX_SPINS*sizeof(*MyProc->sLocks));
266     
267     /* -------------------------
268      * Install ourselves in the binding table.  The name to
269      * use is determined by the OS-assigned process id.  That
270      * allows the cleanup process to find us after any untimely
271      * exit.
272      * -------------------------
273      */
274     pid = getpid();
275     location = MAKE_OFFSET(MyProc);
276     if ((! ShmemPIDLookup(pid,&location)) || (location != MAKE_OFFSET(MyProc)))
277         {
278             elog(FATAL,"InitProc: ShmemPID table broken");
279         }
280     
281     MyProc->errType = NO_ERROR;
282     SHMQueueElemInit(&(MyProc->links));
283     
284     on_exitpg(ProcKill, (caddr_t)pid);
285     
286     ProcInitialized = TRUE;
287 }
288
289 /*
290  * ProcReleaseLocks() -- release all locks associated with this process
291  *
292  */
293 void
294 ProcReleaseLocks()
295 {
296     if (!MyProc)
297         return;
298     LockReleaseAll(1,&MyProc->lockQueue);
299 }
300
301 /*
302  * ProcRemove -
303  *    used by the postmaster to clean up the global tables. This also frees
304  *    up the semaphore used for the lmgr of the process. (We have to do
305  *    this is the postmaster instead of doing a IpcSemaphoreKill on exiting
306  *    the process because the semaphore set is shared among backends and
307  *    we don't want to remove other's semaphores on exit.)
308  */
309 bool
310 ProcRemove(int pid)
311 {
312     SHMEM_OFFSET  location;
313     PROC *proc;
314     
315     location = INVALID_OFFSET;
316     
317     location = ShmemPIDDestroy(pid);
318     if (location == INVALID_OFFSET)
319         return(FALSE);
320     proc = (PROC *) MAKE_PTR(location);
321
322     SpinAcquire(ProcStructLock);
323     
324     ProcFreeSem(proc->sem.semKey, proc->sem.semNum);
325
326     proc->links.next =  ProcGlobal->freeProcs;
327     ProcGlobal->freeProcs = MAKE_OFFSET(proc);
328     
329     SpinRelease(ProcStructLock);
330
331     return(TRUE);
332 }
333
334 /*
335  * ProcKill() -- Destroy the per-proc data structure for
336  *      this process. Release any of its held spin locks.
337  */
338 static void
339 ProcKill(int exitStatus, int pid)
340 {
341     PROC                *proc;
342     SHMEM_OFFSET        location;
343     
344     /* -------------------- 
345      * If this is a FATAL exit the postmaster will have to kill all the
346      * existing backends and reinitialize shared memory.  So all we don't 
347      * need to do anything here.
348      * --------------------
349      */
350     if (exitStatus != 0)
351         return;
352     
353     if (! pid)
354         {
355             pid = getpid();
356         }
357     
358     ShmemPIDLookup(pid,&location);
359     if (location == INVALID_OFFSET)
360         return;
361     
362     proc = (PROC *) MAKE_PTR(location);
363     
364     if (proc != MyProc) {
365         Assert( pid != getpid() );
366     } else
367         MyProc = NULL;
368     
369     /* ---------------
370      * Assume one lock table.
371      * ---------------
372      */
373     ProcReleaseSpins(proc);
374     LockReleaseAll(1,&proc->lockQueue);
375     
376 #ifdef USER_LOCKS
377     LockReleaseAll(0,&proc->lockQueue);
378 #endif
379
380     /* ----------------
381      * get off the wait queue
382      * ----------------
383      */
384     LockLockTable();
385     if (proc->links.next != INVALID_OFFSET) {
386         Assert(proc->waitLock->waitProcs.size > 0);
387         SHMQueueDelete(&(proc->links));
388         --proc->waitLock->waitProcs.size;
389     }
390     SHMQueueElemInit(&(proc->links));
391     UnlockLockTable();
392     
393     return;
394 }
395
396 /*
397  * ProcQueue package: routines for putting processes to sleep
398  *      and  waking them up
399  */
400
401 /*
402  * ProcQueueAlloc -- alloc/attach to a shared memory process queue
403  *
404  * Returns: a pointer to the queue or NULL
405  * Side Effects: Initializes the queue if we allocated one
406  */
407 #ifdef NOT_USED
408 PROC_QUEUE *
409 ProcQueueAlloc(char *name)
410 {
411     bool        found;
412     PROC_QUEUE *queue = (PROC_QUEUE *)
413         ShmemInitStruct(name,(unsigned)sizeof(PROC_QUEUE),&found);
414     
415     if (! queue)
416         {
417             return(NULL);
418         }
419     if (! found)
420         {
421             ProcQueueInit(queue);
422         }
423     return(queue);
424 }
425 #endif
426
427 /*
428  * ProcQueueInit -- initialize a shared memory process queue
429  */
430 void
431 ProcQueueInit(PROC_QUEUE *queue)
432 {
433     SHMQueueInit(&(queue->links));
434     queue->size = 0;
435 }
436
437
438
439 /*
440  * ProcSleep -- put a process to sleep
441  *
442  * P() on the semaphore should put us to sleep.  The process
443  * semaphore is cleared by default, so the first time we try
444  * to acquire it, we sleep.
445  *
446  * ASSUME: that no one will fiddle with the queue until after
447  *      we release the spin lock.
448  *
449  * NOTES: The process queue is now a priority queue for locking.
450  */
451 int
452 ProcSleep(PROC_QUEUE *queue,
453           SPINLOCK spinlock,
454           int token,
455           int prio,
456           LOCK *lock)
457 {
458     int         i;
459     PROC        *proc;
460     struct itimerval timeval, dummy;
461     
462     proc = (PROC *) MAKE_PTR(queue->links.prev);
463     for (i=0;i<queue->size;i++)
464         {
465             if (proc->prio < prio)
466                 proc = (PROC *) MAKE_PTR(proc->links.prev);
467             else
468                 break;
469         }
470     
471     MyProc->prio = prio;
472     MyProc->token = token;
473     MyProc->waitLock = lock;
474     
475     /* -------------------
476      * currently, we only need this for the ProcWakeup routines
477      * -------------------
478      */
479     TransactionIdStore((TransactionId) GetCurrentTransactionId(), &MyProc->xid);
480     
481     /* -------------------
482      * assume that these two operations are atomic (because
483      * of the spinlock).
484      * -------------------
485      */
486     SHMQueueInsertTL(&(proc->links),&(MyProc->links));
487     queue->size++;
488     
489     SpinRelease(spinlock);
490     
491     /* --------------
492      * Postgres does not have any deadlock detection code and for this 
493      * reason we must set a timer to wake up the process in the event of
494      * a deadlock.  For now the timer is set for 1 minute and we assume that
495      * any process which sleeps for this amount of time is deadlocked and will 
496      * receive a SIGALRM signal.  The handler should release the processes
497      * semaphore and abort the current transaction.
498      *
499      * Need to zero out struct to set the interval and the micro seconds fields
500      * to 0.
501      * --------------
502      */
503     memset(&timeval, 0, sizeof(struct itimerval));
504     timeval.it_value.tv_sec = DEADLOCK_TIMEOUT;
505     
506     if (setitimer(ITIMER_REAL, &timeval, &dummy))
507         elog(FATAL, "ProcSleep: Unable to set timer for process wakeup");
508     
509     /* --------------
510      * if someone wakes us between SpinRelease and IpcSemaphoreLock,
511      * IpcSemaphoreLock will not block.  The wakeup is "saved" by
512      * the semaphore implementation.
513      * --------------
514      */
515     IpcSemaphoreLock(MyProc->sem.semId, MyProc->sem.semNum, IpcExclusiveLock);
516     
517     /* ---------------
518      * We were awoken before a timeout - now disable the timer
519      * ---------------
520      */
521     timeval.it_value.tv_sec = 0;
522     
523     
524     if (setitimer(ITIMER_REAL, &timeval, &dummy))
525         elog(FATAL, "ProcSleep: Unable to diable timer for process wakeup");
526     
527     /* ----------------
528      * We were assumed to be in a critical section when we went
529      * to sleep.
530      * ----------------
531      */
532     SpinAcquire(spinlock);
533     
534     return(MyProc->errType);
535 }
536
537
538 /*
539  * ProcWakeup -- wake up a process by releasing its private semaphore.
540  *
541  *   remove the process from the wait queue and set its links invalid.
542  *   RETURN: the next process in the wait queue.
543  */
544 static PROC *
545 ProcWakeup(PROC *proc, int errType)
546 {
547     PROC *retProc;
548     /* assume that spinlock has been acquired */
549     
550     if (proc->links.prev == INVALID_OFFSET ||
551         proc->links.next == INVALID_OFFSET)
552         return((PROC *) NULL);
553     
554     retProc = (PROC *) MAKE_PTR(proc->links.prev);
555     
556     /* you have to update waitLock->waitProcs.size yourself */
557     SHMQueueDelete(&(proc->links));
558     SHMQueueElemInit(&(proc->links));
559     
560     proc->errType = errType;
561     
562     IpcSemaphoreUnlock(proc->sem.semId, proc->sem.semNum, IpcExclusiveLock);
563     
564     return retProc;
565 }
566
567
568 /*
569  * ProcGetId --
570  */
571 #ifdef NOT_USED
572 int
573 ProcGetId()
574 {
575     return( MyProc->procId );
576 }
577 #endif
578
579 /*
580  * ProcLockWakeup -- routine for waking up processes when a lock is
581  *      released.
582  */
583 int
584 ProcLockWakeup(PROC_QUEUE *queue, char *ltable, char *lock)
585 {
586     PROC        *proc;
587     int count;
588     
589     if (! queue->size)
590         return(STATUS_NOT_FOUND);
591     
592     proc = (PROC *) MAKE_PTR(queue->links.prev);
593     count = 0;
594     while ((LockResolveConflicts ((LOCKTAB *) ltable,
595                                   (LOCK *) lock,
596                                   proc->token,
597                                   proc->xid) == STATUS_OK))
598         {
599             /* there was a waiting process, grant it the lock before waking it
600              * up.  This will prevent another process from seizing the lock
601              * between the time we release the lock master (spinlock) and
602              * the time that the awoken process begins executing again.
603              */
604             GrantLock((LOCK *) lock, proc->token);
605             queue->size--;
606             
607             /*
608              * ProcWakeup removes proc from the lock waiting process queue and
609              * returns the next proc in chain.  If a writer just dropped
610              * its lock and there are several waiting readers, wake them all up.
611              */
612             proc = ProcWakeup(proc, NO_ERROR);
613             
614             count++;
615             if (!proc || queue->size == 0)
616                 break;
617         }
618     
619     if (count)
620         return(STATUS_OK);
621     else
622         /* Something is still blocking us.  May have deadlocked. */
623         return(STATUS_NOT_FOUND);
624 }
625
626 void
627 ProcAddLock(SHM_QUEUE *elem)
628 {
629     SHMQueueInsertTL(&MyProc->lockQueue,elem);
630 }
631
632 /* --------------------
633  * We only get to this routine if we got SIGALRM after DEADLOCK_TIMEOUT
634  * while waiting for a lock to be released by some other process.  After
635  * the one minute deadline we assume we have a deadlock and must abort
636  * this transaction.  We must also indicate that I'm no longer waiting
637  * on a lock so that other processes don't try to wake me up and screw 
638  * up my semaphore.
639  * --------------------
640  */
641 static void
642 HandleDeadLock(int sig)
643 {
644     LOCK *lock;
645     int size;
646     
647     LockLockTable();
648     
649     /* ---------------------
650      * Check to see if we've been awoken by anyone in the interim.
651      *
652      * If we have we can return and resume our transaction -- happy day.
653      * Before we are awoken the process releasing the lock grants it to
654      * us so we know that we don't have to wait anymore.
655      * 
656      * Damn these names are LONG! -mer
657      * ---------------------
658      */
659     if (IpcSemaphoreGetCount(MyProc->sem.semId, MyProc->sem.semNum) == 
660         IpcSemaphoreDefaultStartValue) {
661         UnlockLockTable();
662         return;
663     }
664     
665     /*
666      * you would think this would be unnecessary, but...
667      *
668      * this also means we've been removed already.  in some ports
669      * (e.g., sparc and aix) the semop(2) implementation is such that
670      * we can actually end up in this handler after someone has removed
671      * us from the queue and bopped the semaphore *but the test above
672      * fails to detect the semaphore update* (presumably something weird
673      * having to do with the order in which the semaphore wakeup signal
674      * and SIGALRM get handled).
675      */
676     if (MyProc->links.prev == INVALID_OFFSET ||
677         MyProc->links.next == INVALID_OFFSET) {
678         UnlockLockTable();
679         return;
680     }
681     
682     lock = MyProc->waitLock;
683     size = lock->waitProcs.size; /* so we can look at this in the core */
684     
685 #ifdef DEADLOCK_DEBUG
686     DumpLocks();
687 #endif
688
689     /* ------------------------
690      * Get this process off the lock's wait queue
691      * ------------------------
692      */
693     Assert(lock->waitProcs.size > 0);
694     --lock->waitProcs.size;
695     SHMQueueDelete(&(MyProc->links));
696     SHMQueueElemInit(&(MyProc->links));
697     
698     /* ------------------
699      * Unlock my semaphore so that the count is right for next time.
700      * I was awoken by a signal, not by someone unlocking my semaphore.
701      * ------------------
702      */
703     IpcSemaphoreUnlock(MyProc->sem.semId, MyProc->sem.semNum, IpcExclusiveLock);
704     
705     /* -------------
706      * Set MyProc->errType to STATUS_ERROR so that we abort after
707      * returning from this handler.
708      * -------------
709      */
710     MyProc->errType = STATUS_ERROR;
711     
712     /*
713      * if this doesn't follow the IpcSemaphoreUnlock then we get lock
714      * table corruption ("LockReplace: xid table corrupted") due to
715      * race conditions.  i don't claim to understand this...
716      */
717     UnlockLockTable();
718     
719     elog(NOTICE, "Timeout -- possible deadlock");
720     return;
721 }
722
723 void
724 ProcReleaseSpins(PROC *proc)
725 {
726     int i;
727     
728     if (!proc)
729         proc = MyProc;
730     
731     if (!proc)
732         return;
733     for (i=0; i < (int)MAX_SPINS; i++)
734         {
735             if (proc->sLocks[i])
736                 {
737                     Assert(proc->sLocks[i] == 1);
738                     SpinRelease(i);
739                 }
740         }
741 }
742
743 /*****************************************************************************
744  * 
745  *****************************************************************************/
746
747 /*
748  * ProcGetNewSemKeyAndNum -
749  *    scan the free semaphore bitmap and allocate a single semaphore from
750  *    a semaphore set. (If the semaphore set doesn't exist yet,
751  *    IpcSemaphoreCreate will create it. Otherwise, we use the existing
752  *    semaphore set.)
753  */
754 static void
755 ProcGetNewSemKeyAndNum(IPCKey *key, int *semNum)
756 {
757     int i;
758     int32 *freeSemMap = ProcGlobal->freeSemMap;
759     unsigned int fullmask;
760
761     /*
762      * we hold ProcStructLock when entering this routine. We scan through
763      * the bitmap to look for a free semaphore.
764      */
765     fullmask = ~0 >> (32 - PROC_NSEMS_PER_SET);
766     for(i=0; i < MAX_PROC_SEMS/PROC_NSEMS_PER_SET; i++) {
767         int mask = 1;
768         int j;
769
770         if (freeSemMap[i] == fullmask)
771             continue; /* none free for this set */
772
773         for(j = 0; j < PROC_NSEMS_PER_SET; j++) {
774             if ((freeSemMap[i] & mask) == 0) {
775                 /*
776                  * a free semaphore found. Mark it as allocated.
777                  */
778                 freeSemMap[i] |= mask;
779
780                 *key = ProcGlobal->currKey + i;
781                 *semNum = j;
782                 return;
783             }
784             mask <<= 1;
785         }
786     }
787
788     /* if we reach here, all the semaphores are in use. */
789     elog(WARN, "InitProc: cannot allocate a free semaphore");
790 }
791
792 /*
793  * ProcFreeSem -
794  *    free up our semaphore in the semaphore set. If we're the last one
795  *    in the set, also remove the semaphore set.
796  */
797 static void
798 ProcFreeSem(IpcSemaphoreKey semKey, int semNum)
799 {
800     int mask;
801     int i;
802     int32 *freeSemMap = ProcGlobal->freeSemMap;
803
804     i = semKey - ProcGlobal->currKey;
805     mask = ~(1 << semNum);
806     freeSemMap[i] &= mask;
807
808     if (freeSemMap[i]==0)
809         IpcSemaphoreKill(semKey);
810 }
811
812 /*
813  * ProcFreeAllSemaphores -
814  *    on exiting the postmaster, we free up all the semaphores allocated
815  *    to the lmgrs of the backends.
816  */
817 void
818 ProcFreeAllSemaphores()
819 {
820     int i;
821     int32 *freeSemMap = ProcGlobal->freeSemMap;
822
823     for(i=0; i < MAX_PROC_SEMS/PROC_NSEMS_PER_SET; i++) {
824         if (freeSemMap[i]!=0)
825             IpcSemaphoreKill(ProcGlobal->currKey + i);
826     }
827 }