X-Git-Url: https://granicus.if.org/sourcecode?a=blobdiff_plain;f=src%2Fbackend%2Fstorage%2Flmgr%2Fproc.c;h=e68d96d772951f89f4aaf1093e92b152ca0eed26;hb=416bbbffa3b0ffc2fde3893ea0928206b10afb0a;hp=be91266cf6fff3c752867248263563491d858e16;hpb=a32450a5855eed4bfd756ef292ee45d3c754665b;p=postgresql diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index be91266cf6..e68d96d772 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -1,13 +1,14 @@ /*------------------------------------------------------------------------- * - * proc.c-- + * proc.c * routines to manage per-process shared memory data structure * - * Copyright (c) 1994, Regents of the University of California + * Portions Copyright (c) 1996-2000, PostgreSQL, Inc + * Portions Copyright (c) 1994, Regents of the University of California * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.34 1998/02/26 04:36:12 momjian Exp $ + * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.80 2000/10/02 19:42:48 petere Exp $ * *------------------------------------------------------------------------- */ @@ -46,38 +47,33 @@ * This is so that we can support more backends. (system-wide semaphore * sets run out pretty fast.) -ay 4/95 * - * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.34 1998/02/26 04:36:12 momjian Exp $ + * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.80 2000/10/02 19:42:48 petere Exp $ */ +#include "postgres.h" + #include #include -#include #include #include -#if defined(sparc_solaris) +#if defined(solaris_sparc) || defined(__CYGWIN__) #include #include #endif -#include "postgres.h" #include "miscadmin.h" -#include "libpq/pqsignal.h" -#include "access/xact.h" -#include "utils/hsearch.h" -#include "storage/ipc.h" -/* In Ultrix, sem.h must be included after ipc.h */ +/* In Ultrix and QNX, sem.h must be included after ipc.h */ #include -#include "storage/buf.h" -#include "storage/lock.h" -#include "storage/lmgr.h" -#include "storage/shmem.h" -#include "storage/spin.h" + #include "storage/proc.h" -static void HandleDeadLock(int sig); -static PROC *ProcWakeup(PROC *proc, int errType); +void HandleDeadLock(SIGNAL_ARGS); +static void ProcFreeAllSemaphores(void); +static bool GetOffWaitqueue(PROC *); + +int DeadlockTimeout = 1000; /* -------------------- * Spin lock for manipulating the shared process data structure: @@ -88,12 +84,6 @@ static PROC *ProcWakeup(PROC *proc, int errType); */ SPINLOCK ProcStructLock; -/* - * For cleanup routines. Don't cleanup if the initialization - * has not happened. - */ -static bool ProcInitialized = FALSE; - static PROC_HDR *ProcGlobal = NULL; PROC *MyProc = NULL; @@ -102,25 +92,40 @@ static void ProcKill(int exitStatus, int pid); static void ProcGetNewSemKeyAndNum(IPCKey *key, int *semNum); static void ProcFreeSem(IpcSemaphoreKey semKey, int semNum); +static char *DeadLockMessage = "Deadlock detected -- See the lock(l) manual page for a possible cause."; + /* * InitProcGlobal - * initializes the global process table. We put it here so that - * the postmaster can do this initialization. (ProcFreeAllSem needs + * the postmaster can do this initialization. (ProcFreeAllSemaphores needs * to read this table on exiting the postmaster. If we have the first * backend do this, starting up and killing the postmaster without * starting any backends will be a problem.) + * + * We also allocate all the per-process semaphores we will need to support + * the requested number of backends. We used to allocate semaphores + * only when backends were actually started up, but that is bad because + * it lets Postgres fail under load --- a lot of Unix systems are + * (mis)configured with small limits on the number of semaphores, and + * running out when trying to start another backend is a common failure. + * So, now we grab enough semaphores to support the desired max number + * of backends immediately at initialization --- if the sysadmin has set + * MaxBackends higher than his kernel will support, he'll find out sooner + * rather than later. */ void -InitProcGlobal(IPCKey key) +InitProcGlobal(IPCKey key, int maxBackends) { bool found = false; /* attach to the free list */ ProcGlobal = (PROC_HDR *) - ShmemInitStruct("Proc Header", (unsigned) sizeof(PROC_HDR), &found); + ShmemInitStruct("Proc Header", sizeof(PROC_HDR), &found); /* -------------------- * We're the first - initialize. + * XXX if found should ever be true, it is a sign of impending doom ... + * ought to complain if so? * -------------------- */ if (!found) @@ -131,6 +136,37 @@ InitProcGlobal(IPCKey key) ProcGlobal->currKey = IPCGetProcessSemaphoreInitKey(key); for (i = 0; i < MAX_PROC_SEMS / PROC_NSEMS_PER_SET; i++) ProcGlobal->freeSemMap[i] = 0; + + /* + * Arrange to delete semas on exit --- set this up now so that we + * will clean up if pre-allocation fails... + */ + on_shmem_exit(ProcFreeAllSemaphores, 0); + + /* + * Pre-create the semaphores for the first maxBackends processes, + * unless we are running as a standalone backend. + */ + if (key != PrivateIPCKey) + { + for (i = 0; + i < (maxBackends + PROC_NSEMS_PER_SET - 1) / PROC_NSEMS_PER_SET; + i++) + { + IPCKey semKey = ProcGlobal->currKey + i; + int semId; + + semId = IpcSemaphoreCreate(semKey, + PROC_NSEMS_PER_SET, + IPCProtection, + IpcSemaphoreDefaultStartValue, + 0); + if (semId < 0) + elog(FATAL, "InitProcGlobal: IpcSemaphoreCreate failed"); + /* mark this sema set allocated */ + ProcGlobal->freeSemMap[i] = (1 << PROC_NSEMS_PER_SET); + } + } } } @@ -143,25 +179,18 @@ void InitProcess(IPCKey key) { bool found = false; - int semstat; unsigned long location, myOffset; - /* ------------------ - * Routine called if deadlock timer goes off. See ProcSleep() - * ------------------ - */ - pqsignal(SIGALRM, HandleDeadLock); - SpinAcquire(ProcStructLock); /* attach to the free list */ ProcGlobal = (PROC_HDR *) - ShmemInitStruct("Proc Header", (unsigned) sizeof(PROC_HDR), &found); + ShmemInitStruct("Proc Header", sizeof(PROC_HDR), &found); if (!found) { /* this should not happen. InitProcGlobal() is called before this. */ - elog(ERROR, "InitProcess: Proc Header uninitialized"); + elog(STOP, "InitProcess: Proc Header uninitialized"); } if (MyProc != NULL) @@ -184,13 +213,13 @@ InitProcess(IPCKey key) { /* - * have to allocate one. We can't use the normal binding table - * mechanism because the proc structure is stored by PID instead - * of by a global name (need to look it up by PID when we cleanup - * dead processes). + * have to allocate one. We can't use the normal shmem index + * table mechanism because the proc structure is stored by PID + * instead of by a global name (need to look it up by PID when we + * cleanup dead processes). */ - MyProc = (PROC *) ShmemAlloc((unsigned) sizeof(PROC)); + MyProc = (PROC *) ShmemAlloc(sizeof(PROC)); if (!MyProc) { SpinRelease(ProcStructLock); @@ -219,12 +248,17 @@ InitProcess(IPCKey key) ProcGetNewSemKeyAndNum(&semKey, &semNum); + /* + * Note: because of the pre-allocation done in InitProcGlobal, + * this call should always attach to an existing semaphore. It + * will (try to) create a new group of semaphores only if the + * postmaster tries to start more backends than it said it would. + */ semId = IpcSemaphoreCreate(semKey, PROC_NSEMS_PER_SET, IPCProtection, IpcSemaphoreDefaultStartValue, - 0, - &semstat); + 0); /* * we might be reusing a semaphore that belongs to a dead backend. @@ -239,9 +273,7 @@ InitProcess(IPCKey key) MyProc->sem.semKey = semKey; } else - { MyProc->sem.semId = -1; - } /* ---------------------- * Release the lock. @@ -249,11 +281,10 @@ InitProcess(IPCKey key) */ SpinRelease(ProcStructLock); - MyProc->pid = 0; -#if 0 MyProc->pid = MyProcPid; -#endif + MyProc->databaseId = MyDatabaseId; MyProc->xid = InvalidTransactionId; + MyProc->xmin = InvalidTransactionId; /* ---------------- * Start keeping spin lock stats from here on. Any botch before @@ -263,7 +294,7 @@ InitProcess(IPCKey key) MemSet(MyProc->sLocks, 0, MAX_SPINS * sizeof(*MyProc->sLocks)); /* ------------------------- - * Install ourselves in the binding table. The name to + * Install ourselves in the shmem index table. The name to * use is determined by the OS-assigned process id. That * allows the cleanup process to find us after any untimely * exit. @@ -271,16 +302,47 @@ InitProcess(IPCKey key) */ location = MAKE_OFFSET(MyProc); if ((!ShmemPIDLookup(MyProcPid, &location)) || (location != MAKE_OFFSET(MyProc))) - { - elog(FATAL, "InitProc: ShmemPID table broken"); - } + elog(STOP, "InitProc: ShmemPID table broken"); MyProc->errType = NO_ERROR; SHMQueueElemInit(&(MyProc->links)); - on_exitpg(ProcKill, (caddr_t) MyProcPid); + on_shmem_exit(ProcKill, (Datum) MyProcPid); +} + +/* ----------------------- + * get off the wait queue + * ----------------------- + */ +static bool +GetOffWaitqueue(PROC *proc) +{ + bool getoffed = false; + + LockLockTable(); + if (proc->links.next != INVALID_OFFSET) + { + int lockmode = proc->token; + LOCK *waitLock = proc->waitLock; + + Assert(waitLock); + Assert(waitLock->waitProcs.size > 0); + SHMQueueDelete(&(proc->links)); + --waitLock->waitProcs.size; + Assert(waitLock->nHolding > 0); + Assert(waitLock->nHolding > proc->waitLock->nActive); + --waitLock->nHolding; + Assert(waitLock->holders[lockmode] > 0); + --waitLock->holders[lockmode]; + if (waitLock->activeHolders[lockmode] == waitLock->holders[lockmode]) + waitLock->waitMask &= ~(1 << lockmode); + ProcLockWakeup(&(waitLock->waitProcs), LOCK_LOCKMETHOD(*waitLock), waitLock); + getoffed = true; + } + SHMQueueElemInit(&(proc->links)); + UnlockLockTable(); - ProcInitialized = TRUE; + return getoffed; } /* @@ -293,6 +355,7 @@ ProcReleaseLocks() if (!MyProc) return; LockReleaseAll(1, &MyProc->lockQueue); + GetOffWaitqueue(MyProc); } /* @@ -313,7 +376,7 @@ ProcRemove(int pid) location = ShmemPIDDestroy(pid); if (location == INVALID_OFFSET) - return (FALSE); + return FALSE; proc = (PROC *) MAKE_PTR(location); SpinAcquire(ProcStructLock); @@ -325,7 +388,7 @@ ProcRemove(int pid) SpinRelease(ProcStructLock); - return (TRUE); + return TRUE; } /* @@ -353,37 +416,30 @@ ProcKill(int exitStatus, int pid) proc = (PROC *) MAKE_PTR(location); - if (proc != MyProc) - { - Assert(pid != MyProcPid); - } - else - MyProc = NULL; + Assert(proc == MyProc || pid != MyProcPid); + + MyProc = NULL; /* --------------- * Assume one lock table. * --------------- */ ProcReleaseSpins(proc); - LockReleaseAll(1, &proc->lockQueue); + LockReleaseAll(DEFAULT_LOCKMETHOD, &proc->lockQueue); #ifdef USER_LOCKS - LockReleaseAll(0, &proc->lockQueue); + + /* + * Assume we have a second lock table. + */ + LockReleaseAll(USER_LOCKMETHOD, &proc->lockQueue); #endif /* ---------------- * get off the wait queue * ---------------- */ - LockLockTable(); - if (proc->links.next != INVALID_OFFSET) - { - Assert(proc->waitLock->waitProcs.size > 0); - SHMQueueDelete(&(proc->links)); - --proc->waitLock->waitProcs.size; - } - SHMQueueElemInit(&(proc->links)); - UnlockLockTable(); + GetOffWaitqueue(proc); return; } @@ -405,17 +461,13 @@ ProcQueueAlloc(char *name) { bool found; PROC_QUEUE *queue = (PROC_QUEUE *) - ShmemInitStruct(name, (unsigned) sizeof(PROC_QUEUE), &found); + ShmemInitStruct(name, sizeof(PROC_QUEUE), &found); if (!queue) - { - return (NULL); - } + return NULL; if (!found) - { ProcQueueInit(queue); - } - return (queue); + return queue; } #endif @@ -431,6 +483,50 @@ ProcQueueInit(PROC_QUEUE *queue) } +/* + * Handling cancel request while waiting for lock + * + */ +static bool lockWaiting = false; +void +SetWaitingForLock(bool waiting) +{ + if (waiting == lockWaiting) + return; + lockWaiting = waiting; + if (lockWaiting) + { + /* The lock was already released ? */ + if (MyProc->links.next == INVALID_OFFSET) + { + lockWaiting = false; + return; + } + if (QueryCancel) /* cancel request pending */ + { + if (GetOffWaitqueue(MyProc)) + { + lockWaiting = false; + elog(ERROR, "Query cancel requested while waiting lock"); + } + } + } +} +void +LockWaitCancel(void) +{ + struct itimerval timeval, + dummy; + + if (!lockWaiting) + return; + lockWaiting = false; + /* Deadlock timer off */ + MemSet(&timeval, 0, sizeof(struct itimerval)); + setitimer(ITIMER_REAL, &timeval, &dummy); + if (GetOffWaitqueue(MyProc)) + elog(ERROR, "Query cancel requested while waiting lock"); +} /* * ProcSleep -- put a process to sleep @@ -445,57 +541,80 @@ ProcQueueInit(PROC_QUEUE *queue) * NOTES: The process queue is now a priority queue for locking. */ int -ProcSleep(PROC_QUEUE *waitQueue, - SPINLOCK spinlock, - int token, - int prio, +ProcSleep(PROC_QUEUE *waitQueue,/* lock->waitProcs */ + LOCKMETHODCTL *lockctl, + int token, /* lockmode */ LOCK *lock) { int i; + SPINLOCK spinlock = lockctl->masterLock; PROC *proc; + int myMask = (1 << token); + int waitMask = lock->waitMask; + int aheadHolders[MAX_LOCKMODES]; + bool selfConflict = (lockctl->conflictTab[token] & myMask), + prevSame = false; + bool deadlock_checked = false; struct itimerval timeval, dummy; - /* - * If the first entries in the waitQueue have a greater priority than - * we have, we must be a reader, and they must be a writers, and we - * must be here because the current holder is a writer or a reader but - * we don't share shared locks if a writer is waiting. We put - * ourselves after the writers. This way, we have a FIFO, but keep - * the readers together to give them decent priority, and no one - * starves. Because we group all readers together, a non-empty queue - * only has a few possible configurations: - * - * [readers] [writers] [readers][writers] [writers][readers] - * [writers][readers][writers] - * - * In a full queue, we would have a reader holding a lock, then a writer - * gets the lock, then a bunch of readers, made up of readers who - * could not share the first readlock because a writer was waiting, - * and new readers arriving while the writer had the lock. - * - */ + MyProc->token = token; + MyProc->waitLock = lock; + proc = (PROC *) MAKE_PTR(waitQueue->links.prev); - /* If we are a reader, and they are writers, skip past them */ + /* if we don't conflict with any waiter - be first in queue */ + if (!(lockctl->conflictTab[token] & waitMask)) + goto ins; - for (i = 0; i < waitQueue->size && proc->prio > prio; i++) - proc = (PROC *) MAKE_PTR(proc->links.prev); + for (i = 1; i < MAX_LOCKMODES; i++) + aheadHolders[i] = lock->activeHolders[i]; + (aheadHolders[token])++; - /* The rest of the queue is FIFO, with readers first, writers last */ - for (; i < waitQueue->size && proc->prio <= prio; i++) - proc = (PROC *) MAKE_PTR(proc->links.prev); + for (i = 0; i < waitQueue->size; i++) + { + /* am I waiting for him ? */ + if (lockctl->conflictTab[token] & proc->holdLock) + { + /* is he waiting for me ? */ + if (lockctl->conflictTab[proc->token] & MyProc->holdLock) + { + MyProc->errType = STATUS_ERROR; + elog(NOTICE, DeadLockMessage); + goto rt; + } + /* being waiting for him - go past */ + } + /* if he waits for me */ + else if (lockctl->conflictTab[proc->token] & MyProc->holdLock) + break; + /* if conflicting locks requested */ + else if (lockctl->conflictTab[proc->token] & myMask) + { - MyProc->prio = prio; - MyProc->token = token; - MyProc->waitLock = lock; + /* + * If I request non self-conflicting lock and there are others + * requesting the same lock just before me - stay here. + */ + if (!selfConflict && prevSame) + break; + } - /* ------------------- - * currently, we only need this for the ProcWakeup routines - * ------------------- - */ - TransactionIdStore((TransactionId) GetCurrentTransactionId(), &MyProc->xid); + /* + * Last attempt to don't move any more: if we don't conflict with + * rest waiters in queue. + */ + else if (!(lockctl->conflictTab[token] & waitMask)) + break; + prevSame = (proc->token == token); + (aheadHolders[proc->token])++; + if (aheadHolders[proc->token] == lock->holders[proc->token]) + waitMask &= ~(1 << proc->token); + proc = (PROC *) MAKE_PTR(proc->links.prev); + } + +ins:; /* ------------------- * assume that these two operations are atomic (because * of the spinlock). @@ -504,6 +623,7 @@ ProcSleep(PROC_QUEUE *waitQueue, SHMQueueInsertTL(&(proc->links), &(MyProc->links)); waitQueue->size++; + lock->waitMask |= myMask; SpinRelease(spinlock); /* -------------- @@ -516,14 +636,18 @@ ProcSleep(PROC_QUEUE *waitQueue, * -------------- */ MemSet(&timeval, 0, sizeof(struct itimerval)); - timeval.it_value.tv_sec = DEADLOCK_CHECK_TIMER; + timeval.it_value.tv_sec = DeadlockTimeout / 1000; + timeval.it_value.tv_usec = (DeadlockTimeout % 1000) * 1000; + SetWaitingForLock(true); do { MyProc->errType = NO_ERROR; /* reset flag after deadlock check */ - if (setitimer(ITIMER_REAL, &timeval, &dummy)) - elog(FATAL, "ProcSleep: Unable to set timer for process wakeup"); + if (!deadlock_checked) + if (setitimer(ITIMER_REAL, &timeval, &dummy)) + elog(FATAL, "ProcSleep: Unable to set timer for process wakeup"); + deadlock_checked = true; /* -------------- * if someone wakes us between SpinRelease and IpcSemaphoreLock, @@ -531,17 +655,18 @@ ProcSleep(PROC_QUEUE *waitQueue, * the semaphore implementation. * -------------- */ - IpcSemaphoreLock(MyProc->sem.semId, MyProc->sem.semNum, IpcExclusiveLock); + IpcSemaphoreLock(MyProc->sem.semId, MyProc->sem.semNum, + IpcExclusiveLock); } while (MyProc->errType == STATUS_NOT_FOUND); /* sleep after deadlock * check */ + lockWaiting = false; /* --------------- * We were awoken before a timeout - now disable the timer * --------------- */ timeval.it_value.tv_sec = 0; - - + timeval.it_value.tv_usec = 0; if (setitimer(ITIMER_REAL, &timeval, &dummy)) elog(FATAL, "ProcSleep: Unable to diable timer for process wakeup"); @@ -552,7 +677,14 @@ ProcSleep(PROC_QUEUE *waitQueue, */ SpinAcquire(spinlock); - return (MyProc->errType); +rt:; + +#ifdef LOCK_DEBUG + /* Just to get meaningful debug messages from DumpLocks() */ + MyProc->waitLock = (LOCK *) NULL; +#endif + + return MyProc->errType; } @@ -562,7 +694,7 @@ ProcSleep(PROC_QUEUE *waitQueue, * remove the process from the wait queue and set its links invalid. * RETURN: the next process in the wait queue. */ -static PROC * +PROC * ProcWakeup(PROC *proc, int errType) { PROC *retProc; @@ -571,7 +703,7 @@ ProcWakeup(PROC *proc, int errType) if (proc->links.prev == INVALID_OFFSET || proc->links.next == INVALID_OFFSET) - return ((PROC *) NULL); + return (PROC *) NULL; retProc = (PROC *) MAKE_PTR(proc->links.prev); @@ -591,47 +723,79 @@ ProcWakeup(PROC *proc, int errType) * released. */ int -ProcLockWakeup(PROC_QUEUE *queue, char *ltable, char *lock) +ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod, LOCK *lock) { PROC *proc; - int count; + int count = 0; + int last_locktype = 0; + int queue_size = queue->size; + + Assert(queue->size >= 0); if (!queue->size) - return (STATUS_NOT_FOUND); + return STATUS_NOT_FOUND; proc = (PROC *) MAKE_PTR(queue->links.prev); - count = 0; - while ((LockResolveConflicts((LOCKTAB *) ltable, - (LOCK *) lock, - proc->token, - proc->xid) == STATUS_OK)) + while ((queue_size--) && (proc)) { + /* + * This proc will conflict as the previous one did, don't even + * try. + */ + if (proc->token == last_locktype) + continue; + + /* + * Does this proc conflict with locks held by others ? + */ + if (LockResolveConflicts(lockmethod, + lock, + proc->token, + proc->xid, + (XIDLookupEnt *) NULL) != STATUS_OK) + { + if (count != 0) + break; + last_locktype = proc->token; + continue; + } + /* * there was a waiting process, grant it the lock before waking it * up. This will prevent another process from seizing the lock * between the time we release the lock master (spinlock) and the * time that the awoken process begins executing again. */ - GrantLock((LOCK *) lock, proc->token); - queue->size--; + GrantLock(lock, proc->token); /* * ProcWakeup removes proc from the lock waiting process queue and * returns the next proc in chain. */ - proc = ProcWakeup(proc, NO_ERROR); count++; - if (!proc || queue->size == 0) - break; + queue->size--; + proc = ProcWakeup(proc, NO_ERROR); } + Assert(queue->size >= 0); + if (count) - return (STATUS_OK); + return STATUS_OK; else + { /* Something is still blocking us. May have deadlocked. */ - return (STATUS_NOT_FOUND); +#ifdef LOCK_DEBUG + if (lock->tag.lockmethod == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks) + { + elog(DEBUG, "ProcLockWakeup: lock(%lx) can't wake up any process", MAKE_OFFSET(lock)); + if (Debug_deadlocks) + DumpAllLocks(); + } +#endif + return STATUS_NOT_FOUND; + } } void @@ -641,15 +805,15 @@ ProcAddLock(SHM_QUEUE *elem) } /* -------------------- - * We only get to this routine if we got SIGALRM after DEADLOCK_CHECK_TIMER + * We only get to this routine if we got SIGALRM after DeadlockTimeout * while waiting for a lock to be released by some other process. If we have * a real deadlock, we must also indicate that I'm no longer waiting * on a lock so that other processes don't try to wake me up and screw * up my semaphore. * -------------------- */ -static void -HandleDeadLock(int sig) +void +HandleDeadLock(SIGNAL_ARGS) { LOCK *mywaitlock; @@ -690,14 +854,15 @@ HandleDeadLock(int sig) return; } -#ifdef DEADLOCK_DEBUG - DumpLocks(); +#ifdef LOCK_DEBUG + if (Debug_deadlocks) + DumpAllLocks(); #endif - if (!DeadLockCheck(&(MyProc->lockQueue), MyProc->waitLock, true)) + MyProc->errType = STATUS_NOT_FOUND; + if (!DeadLockCheck(MyProc, MyProc->waitLock)) { UnlockLockTable(); - MyProc->errType = STATUS_NOT_FOUND; return; } @@ -708,6 +873,7 @@ HandleDeadLock(int sig) * ------------------------ */ Assert(mywaitlock->waitProcs.size > 0); + lockWaiting = false; --mywaitlock->waitProcs.size; SHMQueueDelete(&(MyProc->links)); SHMQueueElemInit(&(MyProc->links)); @@ -717,7 +883,8 @@ HandleDeadLock(int sig) * I was awoken by a signal, not by someone unlocking my semaphore. * ------------------ */ - IpcSemaphoreUnlock(MyProc->sem.semId, MyProc->sem.semNum, IpcExclusiveLock); + IpcSemaphoreUnlock(MyProc->sem.semId, MyProc->sem.semNum, + IpcExclusiveLock); /* ------------- * Set MyProc->errType to STATUS_ERROR so that we abort after @@ -733,7 +900,7 @@ HandleDeadLock(int sig) */ UnlockLockTable(); - elog(NOTICE, "Deadlock detected -- See the lock(l) manual page for a possible cause."); + elog(NOTICE, DeadLockMessage); return; } @@ -755,6 +922,7 @@ ProcReleaseSpins(PROC *proc) SpinRelease(i); } } + AbortBufferIO(); } /***************************************************************************** @@ -773,20 +941,20 @@ ProcGetNewSemKeyAndNum(IPCKey *key, int *semNum) { int i; int32 *freeSemMap = ProcGlobal->freeSemMap; - unsigned int fullmask; + int32 fullmask = (1 << (PROC_NSEMS_PER_SET + 1)) - 1; /* * we hold ProcStructLock when entering this routine. We scan through * the bitmap to look for a free semaphore. */ - fullmask = ~0 >> (32 - PROC_NSEMS_PER_SET); + for (i = 0; i < MAX_PROC_SEMS / PROC_NSEMS_PER_SET; i++) { int mask = 1; int j; if (freeSemMap[i] == fullmask) - continue; /* none free for this set */ + continue; /* this set is fully allocated */ for (j = 0; j < PROC_NSEMS_PER_SET; j++) { @@ -794,9 +962,10 @@ ProcGetNewSemKeyAndNum(IPCKey *key, int *semNum) { /* - * a free semaphore found. Mark it as allocated. + * a free semaphore found. Mark it as allocated. Also set + * the bit indicating whole set is allocated. */ - freeSemMap[i] |= mask; + freeSemMap[i] |= mask + (1 << PROC_NSEMS_PER_SET); *key = ProcGlobal->currKey + i; *semNum = j; @@ -812,8 +981,7 @@ ProcGetNewSemKeyAndNum(IPCKey *key, int *semNum) /* * ProcFreeSem - - * free up our semaphore in the semaphore set. If we're the last one - * in the set, also remove the semaphore set. + * free up our semaphore in the semaphore set. */ static void ProcFreeSem(IpcSemaphoreKey semKey, int semNum) @@ -826,16 +994,22 @@ ProcFreeSem(IpcSemaphoreKey semKey, int semNum) mask = ~(1 << semNum); freeSemMap[i] &= mask; - if (freeSemMap[i] == 0) - IpcSemaphoreKill(semKey); + /* + * Formerly we'd release a semaphore set if it was now completely + * unused, but now we keep the semaphores to ensure we won't run out + * when starting new backends --- cf. InitProcGlobal. Note that the + * PROC_NSEMS_PER_SET+1'st bit of the freeSemMap entry remains set to + * indicate it is still allocated; ProcFreeAllSemaphores() needs that. + */ } /* * ProcFreeAllSemaphores - - * on exiting the postmaster, we free up all the semaphores allocated - * to the lmgrs of the backends. + * called at shmem_exit time, ie when exiting the postmaster or + * destroying shared state for a failed set of backends. + * Free up all the semaphores allocated to the lmgrs of the backends. */ -void +static void ProcFreeAllSemaphores() { int i;