/*-------------------------------------------------------------------------
*
- * proc.c--
+ * proc.c
* routines to manage per-process shared memory data structure
*
- * Copyright (c) 1994, Regents of the University of California
+ * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
+ * Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.34 1998/02/26 04:36:12 momjian Exp $
+ * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.80 2000/10/02 19:42:48 petere Exp $
*
*-------------------------------------------------------------------------
*/
* This is so that we can support more backends. (system-wide semaphore
* sets run out pretty fast.) -ay 4/95
*
- * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.34 1998/02/26 04:36:12 momjian Exp $
+ * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.80 2000/10/02 19:42:48 petere Exp $
*/
+#include "postgres.h"
+
#include <sys/time.h>
#include <unistd.h>
-#include <string.h>
#include <signal.h>
#include <sys/types.h>
-#if defined(sparc_solaris)
+#if defined(solaris_sparc) || defined(__CYGWIN__)
#include <sys/ipc.h>
#include <sys/sem.h>
#endif
-#include "postgres.h"
#include "miscadmin.h"
-#include "libpq/pqsignal.h"
-#include "access/xact.h"
-#include "utils/hsearch.h"
-#include "storage/ipc.h"
-/* In Ultrix, sem.h must be included after ipc.h */
+/* In Ultrix and QNX, sem.h must be included after ipc.h */
#include <sys/sem.h>
-#include "storage/buf.h"
-#include "storage/lock.h"
-#include "storage/lmgr.h"
-#include "storage/shmem.h"
-#include "storage/spin.h"
+
#include "storage/proc.h"
-static void HandleDeadLock(int sig);
-static PROC *ProcWakeup(PROC *proc, int errType);
+void HandleDeadLock(SIGNAL_ARGS);
+static void ProcFreeAllSemaphores(void);
+static bool GetOffWaitqueue(PROC *);
+
+int DeadlockTimeout = 1000;
/* --------------------
* Spin lock for manipulating the shared process data structure:
*/
SPINLOCK ProcStructLock;
-/*
- * For cleanup routines. Don't cleanup if the initialization
- * has not happened.
- */
-static bool ProcInitialized = FALSE;
-
static PROC_HDR *ProcGlobal = NULL;
PROC *MyProc = NULL;
static void ProcGetNewSemKeyAndNum(IPCKey *key, int *semNum);
static void ProcFreeSem(IpcSemaphoreKey semKey, int semNum);
+static char *DeadLockMessage = "Deadlock detected -- See the lock(l) manual page for a possible cause.";
+
/*
* InitProcGlobal -
* initializes the global process table. We put it here so that
- * the postmaster can do this initialization. (ProcFreeAllSem needs
+ * the postmaster can do this initialization. (ProcFreeAllSemaphores needs
* to read this table on exiting the postmaster. If we have the first
* backend do this, starting up and killing the postmaster without
* starting any backends will be a problem.)
+ *
+ * We also allocate all the per-process semaphores we will need to support
+ * the requested number of backends. We used to allocate semaphores
+ * only when backends were actually started up, but that is bad because
+ * it lets Postgres fail under load --- a lot of Unix systems are
+ * (mis)configured with small limits on the number of semaphores, and
+ * running out when trying to start another backend is a common failure.
+ * So, now we grab enough semaphores to support the desired max number
+ * of backends immediately at initialization --- if the sysadmin has set
+ * MaxBackends higher than his kernel will support, he'll find out sooner
+ * rather than later.
*/
void
-InitProcGlobal(IPCKey key)
+InitProcGlobal(IPCKey key, int maxBackends)
{
bool found = false;
/* attach to the free list */
ProcGlobal = (PROC_HDR *)
- ShmemInitStruct("Proc Header", (unsigned) sizeof(PROC_HDR), &found);
+ ShmemInitStruct("Proc Header", sizeof(PROC_HDR), &found);
/* --------------------
* We're the first - initialize.
+ * XXX if found should ever be true, it is a sign of impending doom ...
+ * ought to complain if so?
* --------------------
*/
if (!found)
ProcGlobal->currKey = IPCGetProcessSemaphoreInitKey(key);
for (i = 0; i < MAX_PROC_SEMS / PROC_NSEMS_PER_SET; i++)
ProcGlobal->freeSemMap[i] = 0;
+
+ /*
+ * Arrange to delete semas on exit --- set this up now so that we
+ * will clean up if pre-allocation fails...
+ */
+ on_shmem_exit(ProcFreeAllSemaphores, 0);
+
+ /*
+ * Pre-create the semaphores for the first maxBackends processes,
+ * unless we are running as a standalone backend.
+ */
+ if (key != PrivateIPCKey)
+ {
+ for (i = 0;
+ i < (maxBackends + PROC_NSEMS_PER_SET - 1) / PROC_NSEMS_PER_SET;
+ i++)
+ {
+ IPCKey semKey = ProcGlobal->currKey + i;
+ int semId;
+
+ semId = IpcSemaphoreCreate(semKey,
+ PROC_NSEMS_PER_SET,
+ IPCProtection,
+ IpcSemaphoreDefaultStartValue,
+ 0);
+ if (semId < 0)
+ elog(FATAL, "InitProcGlobal: IpcSemaphoreCreate failed");
+ /* mark this sema set allocated */
+ ProcGlobal->freeSemMap[i] = (1 << PROC_NSEMS_PER_SET);
+ }
+ }
}
}
InitProcess(IPCKey key)
{
bool found = false;
- int semstat;
unsigned long location,
myOffset;
- /* ------------------
- * Routine called if deadlock timer goes off. See ProcSleep()
- * ------------------
- */
- pqsignal(SIGALRM, HandleDeadLock);
-
SpinAcquire(ProcStructLock);
/* attach to the free list */
ProcGlobal = (PROC_HDR *)
- ShmemInitStruct("Proc Header", (unsigned) sizeof(PROC_HDR), &found);
+ ShmemInitStruct("Proc Header", sizeof(PROC_HDR), &found);
if (!found)
{
/* this should not happen. InitProcGlobal() is called before this. */
- elog(ERROR, "InitProcess: Proc Header uninitialized");
+ elog(STOP, "InitProcess: Proc Header uninitialized");
}
if (MyProc != NULL)
{
/*
- * have to allocate one. We can't use the normal binding table
- * mechanism because the proc structure is stored by PID instead
- * of by a global name (need to look it up by PID when we cleanup
- * dead processes).
+ * have to allocate one. We can't use the normal shmem index
+ * table mechanism because the proc structure is stored by PID
+ * instead of by a global name (need to look it up by PID when we
+ * cleanup dead processes).
*/
- MyProc = (PROC *) ShmemAlloc((unsigned) sizeof(PROC));
+ MyProc = (PROC *) ShmemAlloc(sizeof(PROC));
if (!MyProc)
{
SpinRelease(ProcStructLock);
ProcGetNewSemKeyAndNum(&semKey, &semNum);
+ /*
+ * Note: because of the pre-allocation done in InitProcGlobal,
+ * this call should always attach to an existing semaphore. It
+ * will (try to) create a new group of semaphores only if the
+ * postmaster tries to start more backends than it said it would.
+ */
semId = IpcSemaphoreCreate(semKey,
PROC_NSEMS_PER_SET,
IPCProtection,
IpcSemaphoreDefaultStartValue,
- 0,
- &semstat);
+ 0);
/*
* we might be reusing a semaphore that belongs to a dead backend.
MyProc->sem.semKey = semKey;
}
else
- {
MyProc->sem.semId = -1;
- }
/* ----------------------
* Release the lock.
*/
SpinRelease(ProcStructLock);
- MyProc->pid = 0;
-#if 0
MyProc->pid = MyProcPid;
-#endif
+ MyProc->databaseId = MyDatabaseId;
MyProc->xid = InvalidTransactionId;
+ MyProc->xmin = InvalidTransactionId;
/* ----------------
* Start keeping spin lock stats from here on. Any botch before
MemSet(MyProc->sLocks, 0, MAX_SPINS * sizeof(*MyProc->sLocks));
/* -------------------------
- * Install ourselves in the binding table. The name to
+ * Install ourselves in the shmem index table. The name to
* use is determined by the OS-assigned process id. That
* allows the cleanup process to find us after any untimely
* exit.
*/
location = MAKE_OFFSET(MyProc);
if ((!ShmemPIDLookup(MyProcPid, &location)) || (location != MAKE_OFFSET(MyProc)))
- {
- elog(FATAL, "InitProc: ShmemPID table broken");
- }
+ elog(STOP, "InitProc: ShmemPID table broken");
MyProc->errType = NO_ERROR;
SHMQueueElemInit(&(MyProc->links));
- on_exitpg(ProcKill, (caddr_t) MyProcPid);
+ on_shmem_exit(ProcKill, (Datum) MyProcPid);
+}
+
+/* -----------------------
+ * get off the wait queue
+ * -----------------------
+ */
+static bool
+GetOffWaitqueue(PROC *proc)
+{
+ bool getoffed = false;
+
+ LockLockTable();
+ if (proc->links.next != INVALID_OFFSET)
+ {
+ int lockmode = proc->token;
+ LOCK *waitLock = proc->waitLock;
+
+ Assert(waitLock);
+ Assert(waitLock->waitProcs.size > 0);
+ SHMQueueDelete(&(proc->links));
+ --waitLock->waitProcs.size;
+ Assert(waitLock->nHolding > 0);
+ Assert(waitLock->nHolding > proc->waitLock->nActive);
+ --waitLock->nHolding;
+ Assert(waitLock->holders[lockmode] > 0);
+ --waitLock->holders[lockmode];
+ if (waitLock->activeHolders[lockmode] == waitLock->holders[lockmode])
+ waitLock->waitMask &= ~(1 << lockmode);
+ ProcLockWakeup(&(waitLock->waitProcs), LOCK_LOCKMETHOD(*waitLock), waitLock);
+ getoffed = true;
+ }
+ SHMQueueElemInit(&(proc->links));
+ UnlockLockTable();
- ProcInitialized = TRUE;
+ return getoffed;
}
/*
if (!MyProc)
return;
LockReleaseAll(1, &MyProc->lockQueue);
+ GetOffWaitqueue(MyProc);
}
/*
location = ShmemPIDDestroy(pid);
if (location == INVALID_OFFSET)
- return (FALSE);
+ return FALSE;
proc = (PROC *) MAKE_PTR(location);
SpinAcquire(ProcStructLock);
SpinRelease(ProcStructLock);
- return (TRUE);
+ return TRUE;
}
/*
proc = (PROC *) MAKE_PTR(location);
- if (proc != MyProc)
- {
- Assert(pid != MyProcPid);
- }
- else
- MyProc = NULL;
+ Assert(proc == MyProc || pid != MyProcPid);
+
+ MyProc = NULL;
/* ---------------
* Assume one lock table.
* ---------------
*/
ProcReleaseSpins(proc);
- LockReleaseAll(1, &proc->lockQueue);
+ LockReleaseAll(DEFAULT_LOCKMETHOD, &proc->lockQueue);
#ifdef USER_LOCKS
- LockReleaseAll(0, &proc->lockQueue);
+
+ /*
+ * Assume we have a second lock table.
+ */
+ LockReleaseAll(USER_LOCKMETHOD, &proc->lockQueue);
#endif
/* ----------------
* get off the wait queue
* ----------------
*/
- LockLockTable();
- if (proc->links.next != INVALID_OFFSET)
- {
- Assert(proc->waitLock->waitProcs.size > 0);
- SHMQueueDelete(&(proc->links));
- --proc->waitLock->waitProcs.size;
- }
- SHMQueueElemInit(&(proc->links));
- UnlockLockTable();
+ GetOffWaitqueue(proc);
return;
}
{
bool found;
PROC_QUEUE *queue = (PROC_QUEUE *)
- ShmemInitStruct(name, (unsigned) sizeof(PROC_QUEUE), &found);
+ ShmemInitStruct(name, sizeof(PROC_QUEUE), &found);
if (!queue)
- {
- return (NULL);
- }
+ return NULL;
if (!found)
- {
ProcQueueInit(queue);
- }
- return (queue);
+ return queue;
}
#endif
}
+/*
+ * Handling cancel request while waiting for lock
+ *
+ */
+static bool lockWaiting = false;
+void
+SetWaitingForLock(bool waiting)
+{
+ if (waiting == lockWaiting)
+ return;
+ lockWaiting = waiting;
+ if (lockWaiting)
+ {
+ /* The lock was already released ? */
+ if (MyProc->links.next == INVALID_OFFSET)
+ {
+ lockWaiting = false;
+ return;
+ }
+ if (QueryCancel) /* cancel request pending */
+ {
+ if (GetOffWaitqueue(MyProc))
+ {
+ lockWaiting = false;
+ elog(ERROR, "Query cancel requested while waiting lock");
+ }
+ }
+ }
+}
+void
+LockWaitCancel(void)
+{
+ struct itimerval timeval,
+ dummy;
+
+ if (!lockWaiting)
+ return;
+ lockWaiting = false;
+ /* Deadlock timer off */
+ MemSet(&timeval, 0, sizeof(struct itimerval));
+ setitimer(ITIMER_REAL, &timeval, &dummy);
+ if (GetOffWaitqueue(MyProc))
+ elog(ERROR, "Query cancel requested while waiting lock");
+}
/*
* ProcSleep -- put a process to sleep
* NOTES: The process queue is now a priority queue for locking.
*/
int
-ProcSleep(PROC_QUEUE *waitQueue,
- SPINLOCK spinlock,
- int token,
- int prio,
+ProcSleep(PROC_QUEUE *waitQueue,/* lock->waitProcs */
+ LOCKMETHODCTL *lockctl,
+ int token, /* lockmode */
LOCK *lock)
{
int i;
+ SPINLOCK spinlock = lockctl->masterLock;
PROC *proc;
+ int myMask = (1 << token);
+ int waitMask = lock->waitMask;
+ int aheadHolders[MAX_LOCKMODES];
+ bool selfConflict = (lockctl->conflictTab[token] & myMask),
+ prevSame = false;
+ bool deadlock_checked = false;
struct itimerval timeval,
dummy;
- /*
- * If the first entries in the waitQueue have a greater priority than
- * we have, we must be a reader, and they must be a writers, and we
- * must be here because the current holder is a writer or a reader but
- * we don't share shared locks if a writer is waiting. We put
- * ourselves after the writers. This way, we have a FIFO, but keep
- * the readers together to give them decent priority, and no one
- * starves. Because we group all readers together, a non-empty queue
- * only has a few possible configurations:
- *
- * [readers] [writers] [readers][writers] [writers][readers]
- * [writers][readers][writers]
- *
- * In a full queue, we would have a reader holding a lock, then a writer
- * gets the lock, then a bunch of readers, made up of readers who
- * could not share the first readlock because a writer was waiting,
- * and new readers arriving while the writer had the lock.
- *
- */
+ MyProc->token = token;
+ MyProc->waitLock = lock;
+
proc = (PROC *) MAKE_PTR(waitQueue->links.prev);
- /* If we are a reader, and they are writers, skip past them */
+ /* if we don't conflict with any waiter - be first in queue */
+ if (!(lockctl->conflictTab[token] & waitMask))
+ goto ins;
- for (i = 0; i < waitQueue->size && proc->prio > prio; i++)
- proc = (PROC *) MAKE_PTR(proc->links.prev);
+ for (i = 1; i < MAX_LOCKMODES; i++)
+ aheadHolders[i] = lock->activeHolders[i];
+ (aheadHolders[token])++;
- /* The rest of the queue is FIFO, with readers first, writers last */
- for (; i < waitQueue->size && proc->prio <= prio; i++)
- proc = (PROC *) MAKE_PTR(proc->links.prev);
+ for (i = 0; i < waitQueue->size; i++)
+ {
+ /* am I waiting for him ? */
+ if (lockctl->conflictTab[token] & proc->holdLock)
+ {
+ /* is he waiting for me ? */
+ if (lockctl->conflictTab[proc->token] & MyProc->holdLock)
+ {
+ MyProc->errType = STATUS_ERROR;
+ elog(NOTICE, DeadLockMessage);
+ goto rt;
+ }
+ /* being waiting for him - go past */
+ }
+ /* if he waits for me */
+ else if (lockctl->conflictTab[proc->token] & MyProc->holdLock)
+ break;
+ /* if conflicting locks requested */
+ else if (lockctl->conflictTab[proc->token] & myMask)
+ {
- MyProc->prio = prio;
- MyProc->token = token;
- MyProc->waitLock = lock;
+ /*
+ * If I request non self-conflicting lock and there are others
+ * requesting the same lock just before me - stay here.
+ */
+ if (!selfConflict && prevSame)
+ break;
+ }
- /* -------------------
- * currently, we only need this for the ProcWakeup routines
- * -------------------
- */
- TransactionIdStore((TransactionId) GetCurrentTransactionId(), &MyProc->xid);
+ /*
+ * Last attempt to don't move any more: if we don't conflict with
+ * rest waiters in queue.
+ */
+ else if (!(lockctl->conflictTab[token] & waitMask))
+ break;
+ prevSame = (proc->token == token);
+ (aheadHolders[proc->token])++;
+ if (aheadHolders[proc->token] == lock->holders[proc->token])
+ waitMask &= ~(1 << proc->token);
+ proc = (PROC *) MAKE_PTR(proc->links.prev);
+ }
+
+ins:;
/* -------------------
* assume that these two operations are atomic (because
* of the spinlock).
SHMQueueInsertTL(&(proc->links), &(MyProc->links));
waitQueue->size++;
+ lock->waitMask |= myMask;
SpinRelease(spinlock);
/* --------------
* --------------
*/
MemSet(&timeval, 0, sizeof(struct itimerval));
- timeval.it_value.tv_sec = DEADLOCK_CHECK_TIMER;
+ timeval.it_value.tv_sec = DeadlockTimeout / 1000;
+ timeval.it_value.tv_usec = (DeadlockTimeout % 1000) * 1000;
+ SetWaitingForLock(true);
do
{
MyProc->errType = NO_ERROR; /* reset flag after deadlock check */
- if (setitimer(ITIMER_REAL, &timeval, &dummy))
- elog(FATAL, "ProcSleep: Unable to set timer for process wakeup");
+ if (!deadlock_checked)
+ if (setitimer(ITIMER_REAL, &timeval, &dummy))
+ elog(FATAL, "ProcSleep: Unable to set timer for process wakeup");
+ deadlock_checked = true;
/* --------------
* if someone wakes us between SpinRelease and IpcSemaphoreLock,
* the semaphore implementation.
* --------------
*/
- IpcSemaphoreLock(MyProc->sem.semId, MyProc->sem.semNum, IpcExclusiveLock);
+ IpcSemaphoreLock(MyProc->sem.semId, MyProc->sem.semNum,
+ IpcExclusiveLock);
} while (MyProc->errType == STATUS_NOT_FOUND); /* sleep after deadlock
* check */
+ lockWaiting = false;
/* ---------------
* We were awoken before a timeout - now disable the timer
* ---------------
*/
timeval.it_value.tv_sec = 0;
-
-
+ timeval.it_value.tv_usec = 0;
if (setitimer(ITIMER_REAL, &timeval, &dummy))
elog(FATAL, "ProcSleep: Unable to diable timer for process wakeup");
*/
SpinAcquire(spinlock);
- return (MyProc->errType);
+rt:;
+
+#ifdef LOCK_DEBUG
+ /* Just to get meaningful debug messages from DumpLocks() */
+ MyProc->waitLock = (LOCK *) NULL;
+#endif
+
+ return MyProc->errType;
}
* remove the process from the wait queue and set its links invalid.
* RETURN: the next process in the wait queue.
*/
-static PROC *
+PROC *
ProcWakeup(PROC *proc, int errType)
{
PROC *retProc;
if (proc->links.prev == INVALID_OFFSET ||
proc->links.next == INVALID_OFFSET)
- return ((PROC *) NULL);
+ return (PROC *) NULL;
retProc = (PROC *) MAKE_PTR(proc->links.prev);
* released.
*/
int
-ProcLockWakeup(PROC_QUEUE *queue, char *ltable, char *lock)
+ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod, LOCK *lock)
{
PROC *proc;
- int count;
+ int count = 0;
+ int last_locktype = 0;
+ int queue_size = queue->size;
+
+ Assert(queue->size >= 0);
if (!queue->size)
- return (STATUS_NOT_FOUND);
+ return STATUS_NOT_FOUND;
proc = (PROC *) MAKE_PTR(queue->links.prev);
- count = 0;
- while ((LockResolveConflicts((LOCKTAB *) ltable,
- (LOCK *) lock,
- proc->token,
- proc->xid) == STATUS_OK))
+ while ((queue_size--) && (proc))
{
+ /*
+ * This proc will conflict as the previous one did, don't even
+ * try.
+ */
+ if (proc->token == last_locktype)
+ continue;
+
+ /*
+ * Does this proc conflict with locks held by others ?
+ */
+ if (LockResolveConflicts(lockmethod,
+ lock,
+ proc->token,
+ proc->xid,
+ (XIDLookupEnt *) NULL) != STATUS_OK)
+ {
+ if (count != 0)
+ break;
+ last_locktype = proc->token;
+ continue;
+ }
+
/*
* there was a waiting process, grant it the lock before waking it
* up. This will prevent another process from seizing the lock
* between the time we release the lock master (spinlock) and the
* time that the awoken process begins executing again.
*/
- GrantLock((LOCK *) lock, proc->token);
- queue->size--;
+ GrantLock(lock, proc->token);
/*
* ProcWakeup removes proc from the lock waiting process queue and
* returns the next proc in chain.
*/
- proc = ProcWakeup(proc, NO_ERROR);
count++;
- if (!proc || queue->size == 0)
- break;
+ queue->size--;
+ proc = ProcWakeup(proc, NO_ERROR);
}
+ Assert(queue->size >= 0);
+
if (count)
- return (STATUS_OK);
+ return STATUS_OK;
else
+ {
/* Something is still blocking us. May have deadlocked. */
- return (STATUS_NOT_FOUND);
+#ifdef LOCK_DEBUG
+ if (lock->tag.lockmethod == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
+ {
+ elog(DEBUG, "ProcLockWakeup: lock(%lx) can't wake up any process", MAKE_OFFSET(lock));
+ if (Debug_deadlocks)
+ DumpAllLocks();
+ }
+#endif
+ return STATUS_NOT_FOUND;
+ }
}
void
}
/* --------------------
- * We only get to this routine if we got SIGALRM after DEADLOCK_CHECK_TIMER
+ * We only get to this routine if we got SIGALRM after DeadlockTimeout
* while waiting for a lock to be released by some other process. If we have
* a real deadlock, we must also indicate that I'm no longer waiting
* on a lock so that other processes don't try to wake me up and screw
* up my semaphore.
* --------------------
*/
-static void
-HandleDeadLock(int sig)
+void
+HandleDeadLock(SIGNAL_ARGS)
{
LOCK *mywaitlock;
return;
}
-#ifdef DEADLOCK_DEBUG
- DumpLocks();
+#ifdef LOCK_DEBUG
+ if (Debug_deadlocks)
+ DumpAllLocks();
#endif
- if (!DeadLockCheck(&(MyProc->lockQueue), MyProc->waitLock, true))
+ MyProc->errType = STATUS_NOT_FOUND;
+ if (!DeadLockCheck(MyProc, MyProc->waitLock))
{
UnlockLockTable();
- MyProc->errType = STATUS_NOT_FOUND;
return;
}
* ------------------------
*/
Assert(mywaitlock->waitProcs.size > 0);
+ lockWaiting = false;
--mywaitlock->waitProcs.size;
SHMQueueDelete(&(MyProc->links));
SHMQueueElemInit(&(MyProc->links));
* I was awoken by a signal, not by someone unlocking my semaphore.
* ------------------
*/
- IpcSemaphoreUnlock(MyProc->sem.semId, MyProc->sem.semNum, IpcExclusiveLock);
+ IpcSemaphoreUnlock(MyProc->sem.semId, MyProc->sem.semNum,
+ IpcExclusiveLock);
/* -------------
* Set MyProc->errType to STATUS_ERROR so that we abort after
*/
UnlockLockTable();
- elog(NOTICE, "Deadlock detected -- See the lock(l) manual page for a possible cause.");
+ elog(NOTICE, DeadLockMessage);
return;
}
SpinRelease(i);
}
}
+ AbortBufferIO();
}
/*****************************************************************************
{
int i;
int32 *freeSemMap = ProcGlobal->freeSemMap;
- unsigned int fullmask;
+ int32 fullmask = (1 << (PROC_NSEMS_PER_SET + 1)) - 1;
/*
* we hold ProcStructLock when entering this routine. We scan through
* the bitmap to look for a free semaphore.
*/
- fullmask = ~0 >> (32 - PROC_NSEMS_PER_SET);
+
for (i = 0; i < MAX_PROC_SEMS / PROC_NSEMS_PER_SET; i++)
{
int mask = 1;
int j;
if (freeSemMap[i] == fullmask)
- continue; /* none free for this set */
+ continue; /* this set is fully allocated */
for (j = 0; j < PROC_NSEMS_PER_SET; j++)
{
{
/*
- * a free semaphore found. Mark it as allocated.
+ * a free semaphore found. Mark it as allocated. Also set
+ * the bit indicating whole set is allocated.
*/
- freeSemMap[i] |= mask;
+ freeSemMap[i] |= mask + (1 << PROC_NSEMS_PER_SET);
*key = ProcGlobal->currKey + i;
*semNum = j;
/*
* ProcFreeSem -
- * free up our semaphore in the semaphore set. If we're the last one
- * in the set, also remove the semaphore set.
+ * free up our semaphore in the semaphore set.
*/
static void
ProcFreeSem(IpcSemaphoreKey semKey, int semNum)
mask = ~(1 << semNum);
freeSemMap[i] &= mask;
- if (freeSemMap[i] == 0)
- IpcSemaphoreKill(semKey);
+ /*
+ * Formerly we'd release a semaphore set if it was now completely
+ * unused, but now we keep the semaphores to ensure we won't run out
+ * when starting new backends --- cf. InitProcGlobal. Note that the
+ * PROC_NSEMS_PER_SET+1'st bit of the freeSemMap entry remains set to
+ * indicate it is still allocated; ProcFreeAllSemaphores() needs that.
+ */
}
/*
* ProcFreeAllSemaphores -
- * on exiting the postmaster, we free up all the semaphores allocated
- * to the lmgrs of the backends.
+ * called at shmem_exit time, ie when exiting the postmaster or
+ * destroying shared state for a failed set of backends.
+ * Free up all the semaphores allocated to the lmgrs of the backends.
*/
-void
+static void
ProcFreeAllSemaphores()
{
int i;