]> granicus.if.org Git - postgresql/blobdiff - src/backend/storage/lmgr/proc.c
Banish caddr_t (mostly), use Datum where appropriate.
[postgresql] / src / backend / storage / lmgr / proc.c
index d4e14fb9c4de1dc4ad9a1a08d96f2914bb43847f..e68d96d772951f89f4aaf1093e92b152ca0eed26 100644 (file)
@@ -1,13 +1,14 @@
 /*-------------------------------------------------------------------------
  *
- * proc.c--
+ * proc.c
  *       routines to manage per-process shared memory data structure
  *
- * Copyright (c) 1994, Regents of the University of California
+ * Portions Copyright (c) 1996-2000, PostgreSQL, Inc
+ * Portions Copyright (c) 1994, Regents of the University of California
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.39 1998/06/30 02:33:32 momjian Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.80 2000/10/02 19:42:48 petere Exp $
  *
  *-------------------------------------------------------------------------
  */
  *             This is so that we can support more backends. (system-wide semaphore
  *             sets run out pretty fast.)                                -ay 4/95
  *
- * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.39 1998/06/30 02:33:32 momjian Exp $
+ * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.80 2000/10/02 19:42:48 petere Exp $
  */
+#include "postgres.h"
+
 #include <sys/time.h>
 #include <unistd.h>
-#include <string.h>
 #include <signal.h>
 #include <sys/types.h>
 
-#if defined(solaris_sparc)
+#if defined(solaris_sparc) || defined(__CYGWIN__)
 #include <sys/ipc.h>
 #include <sys/sem.h>
 #endif
 
-#include "postgres.h"
 #include "miscadmin.h"
-#include "libpq/pqsignal.h"
 
-#include "access/xact.h"
-#include "utils/hsearch.h"
 
-#include "storage/ipc.h"
-/* In Ultrix, sem.h must be included after ipc.h */
+/* In Ultrix and QNX, sem.h must be included after ipc.h */
 #include <sys/sem.h>
-#include "storage/buf.h"
-#include "storage/lock.h"
-#include "storage/lmgr.h"
-#include "storage/shmem.h"
-#include "storage/spin.h"
+
 #include "storage/proc.h"
 
-static void HandleDeadLock(int sig);
-static PROC *ProcWakeup(PROC *proc, int errType);
+void           HandleDeadLock(SIGNAL_ARGS);
+static void ProcFreeAllSemaphores(void);
+static bool GetOffWaitqueue(PROC *);
+
+int DeadlockTimeout = 1000;
 
 /* --------------------
  * Spin lock for manipulating the shared process data structure:
@@ -88,12 +84,6 @@ static PROC *ProcWakeup(PROC *proc, int errType);
  */
 SPINLOCK       ProcStructLock;
 
-/*
- * For cleanup routines.  Don't cleanup if the initialization
- * has not happened.
- */
-static bool ProcInitialized = FALSE;
-
 static PROC_HDR *ProcGlobal = NULL;
 
 PROC      *MyProc = NULL;
@@ -102,25 +92,40 @@ static void ProcKill(int exitStatus, int pid);
 static void ProcGetNewSemKeyAndNum(IPCKey *key, int *semNum);
 static void ProcFreeSem(IpcSemaphoreKey semKey, int semNum);
 
+static char *DeadLockMessage = "Deadlock detected -- See the lock(l) manual page for a possible cause.";
+
 /*
  * InitProcGlobal -
  *       initializes the global process table. We put it here so that
- *       the postmaster can do this initialization. (ProcFreeAllSem needs
+ *       the postmaster can do this initialization. (ProcFreeAllSemaphores needs
  *       to read this table on exiting the postmaster. If we have the first
  *       backend do this, starting up and killing the postmaster without
  *       starting any backends will be a problem.)
+ *
+ *       We also allocate all the per-process semaphores we will need to support
+ *       the requested number of backends.  We used to allocate semaphores
+ *       only when backends were actually started up, but that is bad because
+ *       it lets Postgres fail under load --- a lot of Unix systems are
+ *       (mis)configured with small limits on the number of semaphores, and
+ *       running out when trying to start another backend is a common failure.
+ *       So, now we grab enough semaphores to support the desired max number
+ *       of backends immediately at initialization --- if the sysadmin has set
+ *       MaxBackends higher than his kernel will support, he'll find out sooner
+ *       rather than later.
  */
 void
-InitProcGlobal(IPCKey key)
+InitProcGlobal(IPCKey key, int maxBackends)
 {
        bool            found = false;
 
        /* attach to the free list */
        ProcGlobal = (PROC_HDR *)
-               ShmemInitStruct("Proc Header", (unsigned) sizeof(PROC_HDR), &found);
+               ShmemInitStruct("Proc Header", sizeof(PROC_HDR), &found);
 
        /* --------------------
         * We're the first - initialize.
+        * XXX if found should ever be true, it is a sign of impending doom ...
+        * ought to complain if so?
         * --------------------
         */
        if (!found)
@@ -131,6 +136,37 @@ InitProcGlobal(IPCKey key)
                ProcGlobal->currKey = IPCGetProcessSemaphoreInitKey(key);
                for (i = 0; i < MAX_PROC_SEMS / PROC_NSEMS_PER_SET; i++)
                        ProcGlobal->freeSemMap[i] = 0;
+
+               /*
+                * Arrange to delete semas on exit --- set this up now so that we
+                * will clean up if pre-allocation fails...
+                */
+               on_shmem_exit(ProcFreeAllSemaphores, 0);
+
+               /*
+                * Pre-create the semaphores for the first maxBackends processes,
+                * unless we are running as a standalone backend.
+                */
+               if (key != PrivateIPCKey)
+               {
+                       for (i = 0;
+                                i < (maxBackends + PROC_NSEMS_PER_SET - 1) / PROC_NSEMS_PER_SET;
+                                i++)
+                       {
+                               IPCKey          semKey = ProcGlobal->currKey + i;
+                               int                     semId;
+
+                               semId = IpcSemaphoreCreate(semKey,
+                                                                                  PROC_NSEMS_PER_SET,
+                                                                                  IPCProtection,
+                                                                                  IpcSemaphoreDefaultStartValue,
+                                                                                  0);
+                               if (semId < 0)
+                                       elog(FATAL, "InitProcGlobal: IpcSemaphoreCreate failed");
+                               /* mark this sema set allocated */
+                               ProcGlobal->freeSemMap[i] = (1 << PROC_NSEMS_PER_SET);
+                       }
+               }
        }
 }
 
@@ -143,25 +179,18 @@ void
 InitProcess(IPCKey key)
 {
        bool            found = false;
-       int                     semstat;
        unsigned long location,
                                myOffset;
 
-       /* ------------------
-        * Routine called if deadlock timer goes off. See ProcSleep()
-        * ------------------
-        */
-       pqsignal(SIGALRM, HandleDeadLock);
-
        SpinAcquire(ProcStructLock);
 
        /* attach to the free list */
        ProcGlobal = (PROC_HDR *)
-               ShmemInitStruct("Proc Header", (unsigned) sizeof(PROC_HDR), &found);
+               ShmemInitStruct("Proc Header", sizeof(PROC_HDR), &found);
        if (!found)
        {
                /* this should not happen. InitProcGlobal() is called before this. */
-               elog(ERROR, "InitProcess: Proc Header uninitialized");
+               elog(STOP, "InitProcess: Proc Header uninitialized");
        }
 
        if (MyProc != NULL)
@@ -184,13 +213,13 @@ InitProcess(IPCKey key)
        {
 
                /*
-                * have to allocate one.  We can't use the normal shmem index table
-                * mechanism because the proc structure is stored by PID instead
-                * of by a global name (need to look it up by PID when we cleanup
-                * dead processes).
+                * have to allocate one.  We can't use the normal shmem index
+                * table mechanism because the proc structure is stored by PID
+                * instead of by a global name (need to look it up by PID when we
+                * cleanup dead processes).
                 */
 
-               MyProc = (PROC *) ShmemAlloc((unsigned) sizeof(PROC));
+               MyProc = (PROC *) ShmemAlloc(sizeof(PROC));
                if (!MyProc)
                {
                        SpinRelease(ProcStructLock);
@@ -219,12 +248,17 @@ InitProcess(IPCKey key)
 
                ProcGetNewSemKeyAndNum(&semKey, &semNum);
 
+               /*
+                * Note: because of the pre-allocation done in InitProcGlobal,
+                * this call should always attach to an existing semaphore. It
+                * will (try to) create a new group of semaphores only if the
+                * postmaster tries to start more backends than it said it would.
+                */
                semId = IpcSemaphoreCreate(semKey,
                                                                   PROC_NSEMS_PER_SET,
                                                                   IPCProtection,
                                                                   IpcSemaphoreDefaultStartValue,
-                                                                  0,
-                                                                  &semstat);
+                                                                  0);
 
                /*
                 * we might be reusing a semaphore that belongs to a dead backend.
@@ -247,11 +281,10 @@ InitProcess(IPCKey key)
         */
        SpinRelease(ProcStructLock);
 
-       MyProc->pid = 0;
-#if 0
        MyProc->pid = MyProcPid;
-#endif
+       MyProc->databaseId = MyDatabaseId;
        MyProc->xid = InvalidTransactionId;
+       MyProc->xmin = InvalidTransactionId;
 
        /* ----------------
         * Start keeping spin lock stats from here on.  Any botch before
@@ -269,14 +302,47 @@ InitProcess(IPCKey key)
         */
        location = MAKE_OFFSET(MyProc);
        if ((!ShmemPIDLookup(MyProcPid, &location)) || (location != MAKE_OFFSET(MyProc)))
-               elog(FATAL, "InitProc: ShmemPID table broken");
+               elog(STOP, "InitProc: ShmemPID table broken");
 
        MyProc->errType = NO_ERROR;
        SHMQueueElemInit(&(MyProc->links));
 
-       on_shmem_exit(ProcKill, (caddr_t) MyProcPid);
+       on_shmem_exit(ProcKill, (Datum) MyProcPid);
+}
+
+/* -----------------------
+ * get off the wait queue
+ * -----------------------
+ */
+static bool
+GetOffWaitqueue(PROC *proc)
+{
+       bool            getoffed = false;
+
+       LockLockTable();
+       if (proc->links.next != INVALID_OFFSET)
+       {
+               int                     lockmode = proc->token;
+               LOCK    *waitLock = proc->waitLock;
+
+               Assert(waitLock);
+               Assert(waitLock->waitProcs.size > 0);
+               SHMQueueDelete(&(proc->links));
+               --waitLock->waitProcs.size;
+               Assert(waitLock->nHolding > 0);
+               Assert(waitLock->nHolding > proc->waitLock->nActive);
+               --waitLock->nHolding;
+               Assert(waitLock->holders[lockmode] > 0);
+               --waitLock->holders[lockmode];
+               if (waitLock->activeHolders[lockmode] == waitLock->holders[lockmode])
+                       waitLock->waitMask &= ~(1 << lockmode);
+               ProcLockWakeup(&(waitLock->waitProcs), LOCK_LOCKMETHOD(*waitLock), waitLock);
+               getoffed = true;
+       }
+       SHMQueueElemInit(&(proc->links));
+       UnlockLockTable();
 
-       ProcInitialized = TRUE;
+       return getoffed;
 }
 
 /*
@@ -289,6 +355,7 @@ ProcReleaseLocks()
        if (!MyProc)
                return;
        LockReleaseAll(1, &MyProc->lockQueue);
+       GetOffWaitqueue(MyProc);
 }
 
 /*
@@ -309,7 +376,7 @@ ProcRemove(int pid)
 
        location = ShmemPIDDestroy(pid);
        if (location == INVALID_OFFSET)
-               return (FALSE);
+               return FALSE;
        proc = (PROC *) MAKE_PTR(location);
 
        SpinAcquire(ProcStructLock);
@@ -321,7 +388,7 @@ ProcRemove(int pid)
 
        SpinRelease(ProcStructLock);
 
-       return (TRUE);
+       return TRUE;
 }
 
 /*
@@ -358,25 +425,21 @@ ProcKill(int exitStatus, int pid)
         * ---------------
         */
        ProcReleaseSpins(proc);
-       LockReleaseAll(1, &proc->lockQueue);
+       LockReleaseAll(DEFAULT_LOCKMETHOD, &proc->lockQueue);
 
 #ifdef USER_LOCKS
-       LockReleaseAll(0, &proc->lockQueue);
+
+       /*
+        * Assume we have a second lock table.
+        */
+       LockReleaseAll(USER_LOCKMETHOD, &proc->lockQueue);
 #endif
 
        /* ----------------
         * get off the wait queue
         * ----------------
         */
-       LockLockTable();
-       if (proc->links.next != INVALID_OFFSET)
-       {
-               Assert(proc->waitLock->waitProcs.size > 0);
-               SHMQueueDelete(&(proc->links));
-               --proc->waitLock->waitProcs.size;
-       }
-       SHMQueueElemInit(&(proc->links));
-       UnlockLockTable();
+       GetOffWaitqueue(proc);
 
        return;
 }
@@ -398,13 +461,13 @@ ProcQueueAlloc(char *name)
 {
        bool            found;
        PROC_QUEUE *queue = (PROC_QUEUE *)
-       ShmemInitStruct(name, (unsigned) sizeof(PROC_QUEUE), &found);
+               ShmemInitStruct(name, sizeof(PROC_QUEUE), &found);
 
        if (!queue)
-               return (NULL);
+               return NULL;
        if (!found)
                ProcQueueInit(queue);
-       return (queue);
+       return queue;
 }
 
 #endif
@@ -420,6 +483,50 @@ ProcQueueInit(PROC_QUEUE *queue)
 }
 
 
+/*
+ *     Handling cancel request while waiting for lock
+ *
+ */
+static bool lockWaiting = false;
+void
+SetWaitingForLock(bool waiting)
+{
+       if (waiting == lockWaiting)
+               return;
+       lockWaiting = waiting;
+       if (lockWaiting)
+       {
+               /* The lock was already released ? */
+               if (MyProc->links.next == INVALID_OFFSET)
+               {
+                       lockWaiting = false;
+                       return;
+               }
+               if (QueryCancel)                /* cancel request pending */
+               {
+                       if (GetOffWaitqueue(MyProc))
+                       {
+                               lockWaiting = false;
+                               elog(ERROR, "Query cancel requested while waiting lock");
+                       }
+               }
+       }
+}
+void
+LockWaitCancel(void)
+{
+       struct itimerval timeval,
+                               dummy;
+
+       if (!lockWaiting)
+               return;
+       lockWaiting = false;
+       /* Deadlock timer off */
+       MemSet(&timeval, 0, sizeof(struct itimerval));
+       setitimer(ITIMER_REAL, &timeval, &dummy);
+       if (GetOffWaitqueue(MyProc))
+               elog(ERROR, "Query cancel requested while waiting lock");
+}
 
 /*
  * ProcSleep -- put a process to sleep
@@ -434,57 +541,80 @@ ProcQueueInit(PROC_QUEUE *queue)
  * NOTES: The process queue is now a priority queue for locking.
  */
 int
-ProcSleep(PROC_QUEUE *waitQueue,
-                 SPINLOCK spinlock,
-                 int token,
-                 int prio,
+ProcSleep(PROC_QUEUE *waitQueue,/* lock->waitProcs */
+                 LOCKMETHODCTL *lockctl,
+                 int token,                    /* lockmode */
                  LOCK *lock)
 {
        int                     i;
+       SPINLOCK        spinlock = lockctl->masterLock;
        PROC       *proc;
+       int                     myMask = (1 << token);
+       int                     waitMask = lock->waitMask;
+       int                     aheadHolders[MAX_LOCKMODES];
+       bool            selfConflict = (lockctl->conflictTab[token] & myMask),
+                               prevSame = false;
+       bool            deadlock_checked = false;
        struct itimerval timeval,
                                dummy;
 
-       /*
-        * If the first entries in the waitQueue have a greater priority than
-        * we have, we must be a reader, and they must be a writers, and we
-        * must be here because the current holder is a writer or a reader but
-        * we don't share shared locks if a writer is waiting. We put
-        * ourselves after the writers.  This way, we have a FIFO, but keep
-        * the readers together to give them decent priority, and no one
-        * starves.  Because we group all readers together, a non-empty queue
-        * only has a few possible configurations:
-        *
-        * [readers] [writers] [readers][writers] [writers][readers]
-        * [writers][readers][writers]
-        *
-        * In a full queue, we would have a reader holding a lock, then a writer
-        * gets the lock, then a bunch of readers, made up of readers who
-        * could not share the first readlock because a writer was waiting,
-        * and new readers arriving while the writer had the lock.
-        *
-        */
+       MyProc->token = token;
+       MyProc->waitLock = lock;
+
        proc = (PROC *) MAKE_PTR(waitQueue->links.prev);
 
-       /* If we are a reader, and they are writers, skip past them */
+       /* if we don't conflict with any waiter - be first in queue */
+       if (!(lockctl->conflictTab[token] & waitMask))
+               goto ins;
 
-       for (i = 0; i < waitQueue->size && proc->prio > prio; i++)
-               proc = (PROC *) MAKE_PTR(proc->links.prev);
+       for (i = 1; i < MAX_LOCKMODES; i++)
+               aheadHolders[i] = lock->activeHolders[i];
+       (aheadHolders[token])++;
 
-       /* The rest of the queue is FIFO, with readers first, writers last */
-       for (; i < waitQueue->size && proc->prio <= prio; i++)
-               proc = (PROC *) MAKE_PTR(proc->links.prev);
+       for (i = 0; i < waitQueue->size; i++)
+       {
+               /* am I waiting for him ? */
+               if (lockctl->conflictTab[token] & proc->holdLock)
+               {
+                       /* is he waiting for me ? */
+                       if (lockctl->conflictTab[proc->token] & MyProc->holdLock)
+                       {
+                               MyProc->errType = STATUS_ERROR;
+                               elog(NOTICE, DeadLockMessage);
+                               goto rt;
+                       }
+                       /* being waiting for him - go past */
+               }
+               /* if he waits for me */
+               else if (lockctl->conflictTab[proc->token] & MyProc->holdLock)
+                       break;
+               /* if conflicting locks requested */
+               else if (lockctl->conflictTab[proc->token] & myMask)
+               {
 
-       MyProc->prio = prio;
-       MyProc->token = token;
-       MyProc->waitLock = lock;
+                       /*
+                        * If I request non self-conflicting lock and there are others
+                        * requesting the same lock just before me - stay here.
+                        */
+                       if (!selfConflict && prevSame)
+                               break;
+               }
 
-       /* -------------------
-        * currently, we only need this for the ProcWakeup routines
-        * -------------------
-        */
-       TransactionIdStore((TransactionId) GetCurrentTransactionId(), &MyProc->xid);
+               /*
+                * Last attempt to don't move any more: if we don't conflict with
+                * rest waiters in queue.
+                */
+               else if (!(lockctl->conflictTab[token] & waitMask))
+                       break;
+
+               prevSame = (proc->token == token);
+               (aheadHolders[proc->token])++;
+               if (aheadHolders[proc->token] == lock->holders[proc->token])
+                       waitMask &= ~(1 << proc->token);
+               proc = (PROC *) MAKE_PTR(proc->links.prev);
+       }
 
+ins:;
        /* -------------------
         * assume that these two operations are atomic (because
         * of the spinlock).
@@ -493,6 +623,7 @@ ProcSleep(PROC_QUEUE *waitQueue,
        SHMQueueInsertTL(&(proc->links), &(MyProc->links));
        waitQueue->size++;
 
+       lock->waitMask |= myMask;
        SpinRelease(spinlock);
 
        /* --------------
@@ -505,14 +636,18 @@ ProcSleep(PROC_QUEUE *waitQueue,
         * --------------
         */
        MemSet(&timeval, 0, sizeof(struct itimerval));
-       timeval.it_value.tv_sec = DEADLOCK_CHECK_TIMER;
+       timeval.it_value.tv_sec = DeadlockTimeout / 1000;
+       timeval.it_value.tv_usec = (DeadlockTimeout % 1000) * 1000;
 
+       SetWaitingForLock(true);
        do
        {
                MyProc->errType = NO_ERROR;             /* reset flag after deadlock check */
 
-               if (setitimer(ITIMER_REAL, &timeval, &dummy))
-                       elog(FATAL, "ProcSleep: Unable to set timer for process wakeup");
+               if (!deadlock_checked)
+                       if (setitimer(ITIMER_REAL, &timeval, &dummy))
+                               elog(FATAL, "ProcSleep: Unable to set timer for process wakeup");
+               deadlock_checked = true;
 
                /* --------------
                 * if someone wakes us between SpinRelease and IpcSemaphoreLock,
@@ -520,17 +655,18 @@ ProcSleep(PROC_QUEUE *waitQueue,
                 * the semaphore implementation.
                 * --------------
                 */
-               IpcSemaphoreLock(MyProc->sem.semId, MyProc->sem.semNum, IpcExclusiveLock);
+               IpcSemaphoreLock(MyProc->sem.semId, MyProc->sem.semNum,
+                                                IpcExclusiveLock);
        } while (MyProc->errType == STATUS_NOT_FOUND);          /* sleep after deadlock
                                                                                                                 * check */
+       lockWaiting = false;
 
        /* ---------------
         * We were awoken before a timeout - now disable the timer
         * ---------------
         */
        timeval.it_value.tv_sec = 0;
-
-
+       timeval.it_value.tv_usec = 0;
        if (setitimer(ITIMER_REAL, &timeval, &dummy))
                elog(FATAL, "ProcSleep: Unable to diable timer for process wakeup");
 
@@ -541,7 +677,14 @@ ProcSleep(PROC_QUEUE *waitQueue,
         */
        SpinAcquire(spinlock);
 
-       return (MyProc->errType);
+rt:;
+
+#ifdef LOCK_DEBUG
+       /* Just to get meaningful debug messages from DumpLocks() */
+       MyProc->waitLock = (LOCK *) NULL;
+#endif
+
+       return MyProc->errType;
 }
 
 
@@ -551,7 +694,7 @@ ProcSleep(PROC_QUEUE *waitQueue,
  *      remove the process from the wait queue and set its links invalid.
  *      RETURN: the next process in the wait queue.
  */
-static PROC *
+PROC *
 ProcWakeup(PROC *proc, int errType)
 {
        PROC       *retProc;
@@ -560,7 +703,7 @@ ProcWakeup(PROC *proc, int errType)
 
        if (proc->links.prev == INVALID_OFFSET ||
                proc->links.next == INVALID_OFFSET)
-               return ((PROC *) NULL);
+               return (PROC *) NULL;
 
        retProc = (PROC *) MAKE_PTR(proc->links.prev);
 
@@ -583,18 +726,40 @@ int
 ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod, LOCK *lock)
 {
        PROC       *proc;
-       int                     count;
+       int                     count = 0;
+       int                     last_locktype = 0;
+       int                     queue_size = queue->size;
+
+       Assert(queue->size >= 0);
 
        if (!queue->size)
-               return (STATUS_NOT_FOUND);
+               return STATUS_NOT_FOUND;
 
        proc = (PROC *) MAKE_PTR(queue->links.prev);
-       count = 0;
-       while ((LockResolveConflicts(lockmethod,
+       while ((queue_size--) && (proc))
+       {
+
+               /*
+                * This proc will conflict as the previous one did, don't even
+                * try.
+                */
+               if (proc->token == last_locktype)
+                       continue;
+
+               /*
+                * Does this proc conflict with locks held by others ?
+                */
+               if (LockResolveConflicts(lockmethod,
                                                                 lock,
                                                                 proc->token,
-                                                                proc->xid) == STATUS_OK))
-       {
+                                                                proc->xid,
+                                                                (XIDLookupEnt *) NULL) != STATUS_OK)
+               {
+                       if (count != 0)
+                               break;
+                       last_locktype = proc->token;
+                       continue;
+               }
 
                /*
                 * there was a waiting process, grant it the lock before waking it
@@ -603,24 +768,34 @@ ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod, LOCK *lock)
                 * time that the awoken process begins executing again.
                 */
                GrantLock(lock, proc->token);
-               queue->size--;
 
                /*
                 * ProcWakeup removes proc from the lock waiting process queue and
                 * returns the next proc in chain.
                 */
-               proc = ProcWakeup(proc, NO_ERROR);
 
                count++;
-               if (!proc || queue->size == 0)
-                       break;
+               queue->size--;
+               proc = ProcWakeup(proc, NO_ERROR);
        }
 
+       Assert(queue->size >= 0);
+
        if (count)
-               return (STATUS_OK);
+               return STATUS_OK;
        else
+       {
                /* Something is still blocking us.      May have deadlocked. */
-               return (STATUS_NOT_FOUND);
+#ifdef LOCK_DEBUG
+               if (lock->tag.lockmethod == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
+               {
+                       elog(DEBUG, "ProcLockWakeup: lock(%lx) can't wake up any process", MAKE_OFFSET(lock));
+                       if (Debug_deadlocks)
+                       DumpAllLocks();
+               }
+#endif
+               return STATUS_NOT_FOUND;
+       }
 }
 
 void
@@ -630,15 +805,15 @@ ProcAddLock(SHM_QUEUE *elem)
 }
 
 /* --------------------
- * We only get to this routine if we got SIGALRM after DEADLOCK_CHECK_TIMER
+ * We only get to this routine if we got SIGALRM after DeadlockTimeout
  * while waiting for a lock to be released by some other process.  If we have
  * a real deadlock, we must also indicate that I'm no longer waiting
  * on a lock so that other processes don't try to wake me up and screw
  * up my semaphore.
  * --------------------
  */
-static void
-HandleDeadLock(int sig)
+void
+HandleDeadLock(SIGNAL_ARGS)
 {
        LOCK       *mywaitlock;
 
@@ -679,14 +854,15 @@ HandleDeadLock(int sig)
                return;
        }
 
-#ifdef DEADLOCK_DEBUG
-       DumpLocks();
+#ifdef LOCK_DEBUG
+    if (Debug_deadlocks)
+        DumpAllLocks();
 #endif
 
-       if (!DeadLockCheck(&(MyProc->lockQueue), MyProc->waitLock, true))
+       MyProc->errType = STATUS_NOT_FOUND;
+       if (!DeadLockCheck(MyProc, MyProc->waitLock))
        {
                UnlockLockTable();
-               MyProc->errType = STATUS_NOT_FOUND;
                return;
        }
 
@@ -697,6 +873,7 @@ HandleDeadLock(int sig)
         * ------------------------
         */
        Assert(mywaitlock->waitProcs.size > 0);
+       lockWaiting = false;
        --mywaitlock->waitProcs.size;
        SHMQueueDelete(&(MyProc->links));
        SHMQueueElemInit(&(MyProc->links));
@@ -706,7 +883,8 @@ HandleDeadLock(int sig)
         * I was awoken by a signal, not by someone unlocking my semaphore.
         * ------------------
         */
-       IpcSemaphoreUnlock(MyProc->sem.semId, MyProc->sem.semNum, IpcExclusiveLock);
+       IpcSemaphoreUnlock(MyProc->sem.semId, MyProc->sem.semNum,
+                                          IpcExclusiveLock);
 
        /* -------------
         * Set MyProc->errType to STATUS_ERROR so that we abort after
@@ -722,7 +900,7 @@ HandleDeadLock(int sig)
         */
        UnlockLockTable();
 
-       elog(NOTICE, "Deadlock detected -- See the lock(l) manual page for a possible cause.");
+       elog(NOTICE, DeadLockMessage);
        return;
 }
 
@@ -744,6 +922,7 @@ ProcReleaseSpins(PROC *proc)
                        SpinRelease(i);
                }
        }
+       AbortBufferIO();
 }
 
 /*****************************************************************************
@@ -762,20 +941,20 @@ ProcGetNewSemKeyAndNum(IPCKey *key, int *semNum)
 {
        int                     i;
        int32      *freeSemMap = ProcGlobal->freeSemMap;
-       unsigned int fullmask;
+       int32           fullmask = (1 << (PROC_NSEMS_PER_SET + 1)) - 1;
 
        /*
         * we hold ProcStructLock when entering this routine. We scan through
         * the bitmap to look for a free semaphore.
         */
-       fullmask = ~0 >> (32 - PROC_NSEMS_PER_SET);
+
        for (i = 0; i < MAX_PROC_SEMS / PROC_NSEMS_PER_SET; i++)
        {
                int                     mask = 1;
                int                     j;
 
                if (freeSemMap[i] == fullmask)
-                       continue;                       /* none free for this set */
+                       continue;                       /* this set is fully allocated */
 
                for (j = 0; j < PROC_NSEMS_PER_SET; j++)
                {
@@ -783,9 +962,10 @@ ProcGetNewSemKeyAndNum(IPCKey *key, int *semNum)
                        {
 
                                /*
-                                * a free semaphore found. Mark it as allocated.
+                                * a free semaphore found. Mark it as allocated. Also set
+                                * the bit indicating whole set is allocated.
                                 */
-                               freeSemMap[i] |= mask;
+                               freeSemMap[i] |= mask + (1 << PROC_NSEMS_PER_SET);
 
                                *key = ProcGlobal->currKey + i;
                                *semNum = j;
@@ -801,8 +981,7 @@ ProcGetNewSemKeyAndNum(IPCKey *key, int *semNum)
 
 /*
  * ProcFreeSem -
- *       free up our semaphore in the semaphore set. If we're the last one
- *       in the set, also remove the semaphore set.
+ *       free up our semaphore in the semaphore set.
  */
 static void
 ProcFreeSem(IpcSemaphoreKey semKey, int semNum)
@@ -815,16 +994,22 @@ ProcFreeSem(IpcSemaphoreKey semKey, int semNum)
        mask = ~(1 << semNum);
        freeSemMap[i] &= mask;
 
-       if (freeSemMap[i] == 0)
-               IpcSemaphoreKill(semKey);
+       /*
+        * Formerly we'd release a semaphore set if it was now completely
+        * unused, but now we keep the semaphores to ensure we won't run out
+        * when starting new backends --- cf. InitProcGlobal.  Note that the
+        * PROC_NSEMS_PER_SET+1'st bit of the freeSemMap entry remains set to
+        * indicate it is still allocated; ProcFreeAllSemaphores() needs that.
+        */
 }
 
 /*
  * ProcFreeAllSemaphores -
- *       on exiting the postmaster, we free up all the semaphores allocated
- *       to the lmgrs of the backends.
+ *       called at shmem_exit time, ie when exiting the postmaster or
+ *       destroying shared state for a failed set of backends.
+ *       Free up all the semaphores allocated to the lmgrs of the backends.
  */
-void
+static void
 ProcFreeAllSemaphores()
 {
        int                     i;