]> granicus.if.org Git - postgresql/commitdiff
Re-implement deadlock detection and resolution, per design notes posted
authorTom Lane <tgl@sss.pgh.pa.us>
Thu, 25 Jan 2001 03:31:16 +0000 (03:31 +0000)
committerTom Lane <tgl@sss.pgh.pa.us>
Thu, 25 Jan 2001 03:31:16 +0000 (03:31 +0000)
to pghackers on 18-Jan-01.

src/backend/storage/lmgr/Makefile
src/backend/storage/lmgr/README
src/backend/storage/lmgr/deadlock.c [new file with mode: 0644]
src/backend/storage/lmgr/lock.c
src/backend/storage/lmgr/proc.c
src/include/storage/lock.h
src/include/storage/proc.h

index fca9a24a57331955cf2f1323138f20f699e6b52e..37cf81b1aa7bcbd44683c5e6d483210d8865e739 100644 (file)
@@ -4,7 +4,7 @@
 #    Makefile for storage/lmgr
 #
 # IDENTIFICATION
-#    $Header: /cvsroot/pgsql/src/backend/storage/lmgr/Makefile,v 1.14 2000/08/31 16:10:36 petere Exp $
+#    $Header: /cvsroot/pgsql/src/backend/storage/lmgr/Makefile,v 1.15 2001/01/25 03:31:16 tgl Exp $
 #
 #-------------------------------------------------------------------------
 
@@ -12,7 +12,7 @@ subdir = src/backend/storage/lmgr
 top_builddir = ../../../..
 include $(top_builddir)/src/Makefile.global
 
-OBJS = lmgr.o lock.o proc.o
+OBJS = lmgr.o lock.o proc.o deadlock.o
 
 all: SUBSYS.o
 
index af9fbc8421b1291d81ca8af14c1a54fcf0d0ab23..72e0d16f128d4c6c816b9d4762a57edac0e949d4 100644 (file)
@@ -1,4 +1,4 @@
-$Header: /cvsroot/pgsql/src/backend/storage/lmgr/README,v 1.6 2001/01/22 22:30:06 tgl Exp $
+$Header: /cvsroot/pgsql/src/backend/storage/lmgr/README,v 1.7 2001/01/25 03:31:16 tgl Exp $
 
 There are two fundamental lock structures: the per-lockable-object LOCK
 struct, and the per-lock-holder HOLDER struct.  A LOCK object exists
@@ -373,7 +373,8 @@ time with "C before B", which won't move C far enough up.  So we look for
 soft edges outgoing from C starting at the front of the wait queue.
 
 5. The working data structures needed by the deadlock detection code can
-be proven not to need more than MAXBACKENDS entries.  Therefore the
-working storage can be statically allocated instead of depending on
-palloc().  This is a good thing, since if the deadlock detector could
-fail for extraneous reasons, all the above safety proofs fall down.
+be limited to numbers of entries computed from MaxBackends.  Therefore,
+we can allocate the worst-case space needed during backend startup.
+This seems a safer approach than trying to allocate workspace on the fly;
+we don't want to risk having the deadlock detector run out of memory,
+else we really have no guarantees at all that deadlock will be detected.
diff --git a/src/backend/storage/lmgr/deadlock.c b/src/backend/storage/lmgr/deadlock.c
new file mode 100644 (file)
index 0000000..aae635a
--- /dev/null
@@ -0,0 +1,734 @@
+/*-------------------------------------------------------------------------
+ *
+ * deadlock.c
+ *       POSTGRES deadlock detection code
+ *
+ * See src/backend/storage/lmgr/README for a description of the deadlock
+ * detection and resolution algorithms.
+ *
+ *
+ * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ *       $Header: /cvsroot/pgsql/src/backend/storage/lmgr/deadlock.c,v 1.1 2001/01/25 03:31:16 tgl Exp $
+ *
+ *     Interface:
+ *
+ *     DeadLockCheck()
+ *     InitDeadLockChecking()
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "miscadmin.h"
+#include "storage/proc.h"
+#include "utils/memutils.h"
+
+
+/* One edge in the waits-for graph */
+typedef struct {
+       PROC   *waiter;                         /* the waiting process */
+       PROC   *blocker;                        /* the process it is waiting for */
+       int             pred;                           /* workspace for TopoSort */
+       int             link;                           /* workspace for TopoSort */
+} EDGE;
+
+/* One potential reordering of a lock's wait queue */
+typedef struct {
+       LOCK   *lock;                           /* the lock whose wait queue is described */
+       PROC  **procs;                          /* array of PROC *'s in new wait order */
+       int             nProcs;
+} WAIT_ORDER;
+
+
+static bool DeadLockCheckRecurse(PROC *proc);
+static bool TestConfiguration(PROC *startProc);
+static bool FindLockCycle(PROC *checkProc,
+                                                 EDGE *softEdges, int *nSoftEdges);
+static bool FindLockCycleRecurse(PROC *checkProc,
+                                                                EDGE *softEdges, int *nSoftEdges);
+static bool ExpandConstraints(EDGE *constraints, int nConstraints);
+static bool TopoSort(LOCK *lock, EDGE *constraints, int nConstraints,
+                                        PROC **ordering);
+#ifdef DEBUG_DEADLOCK
+static void PrintLockQueue(LOCK *lock, const char *info);
+#endif
+
+
+/*
+ * Working space for the deadlock detector
+ */
+
+/* Workspace for FindLockCycle */
+static PROC **visitedProcs;            /* Array of visited procs */
+static int nVisitedProcs;
+/* Workspace for TopoSort */
+static PROC **topoProcs;               /* Array of not-yet-output procs */
+static int *beforeConstraints; /* Counts of remaining before-constraints */
+static int *afterConstraints;  /* List head for after-constraints */
+/* Output area for ExpandConstraints */
+static WAIT_ORDER *waitOrders; /* Array of proposed queue rearrangements */
+static int nWaitOrders;
+static PROC **waitOrderProcs;  /* Space for waitOrders queue contents */
+/* Current list of constraints being considered */
+static EDGE *curConstraints;
+static int nCurConstraints;
+static int maxCurConstraints;
+/* Storage space for results from FindLockCycle */
+static EDGE *possibleConstraints;
+static int nPossibleConstraints;
+static int maxPossibleConstraints;
+
+
+/*
+ * InitDeadLockChecking -- initialize deadlock checker during backend startup
+ *
+ * This does per-backend initialization of the deadlock checker; primarily,
+ * allocation of working memory for DeadLockCheck.  We do this per-backend
+ * since there's no percentage in making the kernel do copy-on-write
+ * inheritance of workspace from the postmaster.  We want to allocate the
+ * space at startup because the deadlock checker might be invoked when there's
+ * no free memory left.
+ */
+void
+InitDeadLockChecking(void)
+{
+       MemoryContext   oldcxt;
+
+       /* Make sure allocations are permanent */
+       oldcxt = MemoryContextSwitchTo(TopMemoryContext);
+
+       /*
+        * FindLockCycle needs at most MaxBackends entries in visitedProcs[]
+        */
+       visitedProcs = (PROC **) palloc(MaxBackends * sizeof(PROC *));
+
+       /*
+        * TopoSort needs to consider at most MaxBackends wait-queue entries,
+        * and it needn't run concurrently with FindLockCycle.
+        */
+       topoProcs = visitedProcs;       /* re-use this space */
+       beforeConstraints = (int *) palloc(MaxBackends * sizeof(int));
+       afterConstraints = (int *) palloc(MaxBackends * sizeof(int));
+
+       /*
+        * We need to consider rearranging at most MaxBackends/2 wait queues
+        * (since it takes at least two waiters in a queue to create a soft edge),
+        * and the expanded form of the wait queues can't involve more than
+        * MaxBackends total waiters.
+        */
+       waitOrders = (WAIT_ORDER *) palloc((MaxBackends/2) * sizeof(WAIT_ORDER));
+       waitOrderProcs = (PROC **) palloc(MaxBackends * sizeof(PROC *));
+
+       /*
+        * Allow at most MaxBackends distinct constraints in a configuration.
+        * (Is this enough?  In practice it seems it should be, but I don't quite
+        * see how to prove it.  If we run out, we might fail to find a workable
+        * wait queue rearrangement even though one exists.)  NOTE that this
+        * number limits the maximum recursion depth of DeadLockCheckRecurse.
+        * Making it really big might potentially allow a stack-overflow problem.
+        */
+       maxCurConstraints = MaxBackends;
+       curConstraints = (EDGE *) palloc(maxCurConstraints * sizeof(EDGE));
+
+       /*
+        * Allow up to 3*MaxBackends constraints to be saved without having to
+        * re-run TestConfiguration.  (This is probably more than enough, but
+        * we can survive if we run low on space by doing excess runs of
+        * TestConfiguration to re-compute constraint lists each time needed.)
+        * The last MaxBackends entries in possibleConstraints[] are reserved as
+        * output workspace for FindLockCycle.
+        */
+       maxPossibleConstraints = MaxBackends * 4;
+       possibleConstraints =
+               (EDGE *) palloc(maxPossibleConstraints * sizeof(EDGE));
+
+       MemoryContextSwitchTo(oldcxt);
+}
+
+/*
+ * DeadLockCheck -- Checks for deadlocks for a given process
+ *
+ * This code looks for deadlocks involving the given process.  If any
+ * are found, it tries to rearrange lock wait queues to resolve the
+ * deadlock.  If resolution is impossible, return TRUE --- the caller
+ * is then expected to abort the given proc's transaction.
+ *
+ * We can't block on user locks, so no sense testing for deadlock
+ * because there is no blocking, and no timer for the block.  So,
+ * only look at regular locks.
+ *
+ * We must have already locked the master lock before being called.
+ * NOTE: although the lockctl structure appears to allow each lock
+ * table to have a different spinlock, all locks that can block had
+ * better use the same spinlock, else this code will not be adequately
+ * interlocked!
+ */
+bool
+DeadLockCheck(PROC *proc)
+{
+       int                     i,
+                               j;
+
+       /* Initialize to "no constraints" */
+       nCurConstraints = 0;
+       nPossibleConstraints = 0;
+       nWaitOrders = 0;
+
+       /* Search for deadlocks and possible fixes */
+       if (DeadLockCheckRecurse(proc))
+               return true;                    /* cannot find a non-deadlocked state */
+
+       /* Apply any needed rearrangements of wait queues */
+       for (i = 0; i < nWaitOrders; i++)
+       {
+               LOCK   *lock = waitOrders[i].lock;
+               PROC  **procs = waitOrders[i].procs;
+               int             nProcs = waitOrders[i].nProcs;
+               PROC_QUEUE *waitQueue = &(lock->waitProcs);
+
+               Assert(nProcs == waitQueue->size);
+
+#ifdef DEBUG_DEADLOCK
+               PrintLockQueue(lock, "DeadLockCheck:");
+#endif
+
+               /* Reset the queue and re-add procs in the desired order */
+               ProcQueueInit(waitQueue);
+               for (j = 0; j < nProcs; j++)
+               {
+                       SHMQueueInsertBefore(&(waitQueue->links), &(procs[j]->links));
+                       waitQueue->size++;
+               }
+
+#ifdef DEBUG_DEADLOCK
+               PrintLockQueue(lock, "rearranged to:");
+#endif
+       }
+       return false;
+}
+
+/*
+ * DeadLockCheckRecurse -- recursively search for valid orderings
+ *
+ * curConstraints[] holds the current set of constraints being considered
+ * by an outer level of recursion.  Add to this each possible solution
+ * constraint for any cycle detected at this level.
+ *
+ * Returns TRUE if no solution exists.  Returns FALSE if a deadlock-free
+ * state is attainable, in which case waitOrders[] shows the required
+ * rearrangements of lock wait queues (if any).
+ */
+static bool
+DeadLockCheckRecurse(PROC *proc)
+{
+       int                     nEdges;
+       int                     oldPossibleConstraints;
+       bool            savedList;
+       int                     i;
+
+       nEdges = TestConfiguration(proc);
+       if (nEdges < 0)
+               return true;                    /* hard deadlock --- no solution */
+       if (nEdges == 0)
+               return false;                   /* good configuration found */
+       if (nCurConstraints >= maxCurConstraints)
+               return true;                    /* out of room for active constraints? */
+       oldPossibleConstraints = nPossibleConstraints;
+       if (nPossibleConstraints + nEdges + MaxBackends <= maxPossibleConstraints)
+       {
+               /* We can save the edge list in possibleConstraints[] */
+               nPossibleConstraints += nEdges;
+               savedList = true;
+       }
+       else
+       {
+               /* Not room; will need to regenerate the edges on-the-fly */
+               savedList = false;
+       }
+       /*
+        * Try each available soft edge as an addition to the configuration.
+        */
+       for (i = 0; i < nEdges; i++)
+       {
+               if (!savedList && i > 0)
+               {
+                       /* Regenerate the list of possible added constraints */
+                       if (nEdges != TestConfiguration(proc))
+                               elog(FATAL, "DeadLockCheckRecurse: inconsistent results");
+               }
+               curConstraints[nCurConstraints] =
+                       possibleConstraints[oldPossibleConstraints+i];
+               nCurConstraints++;
+               if (!DeadLockCheckRecurse(proc))
+                       return false;           /* found a valid solution! */
+               /* give up on that added constraint, try again */
+               nCurConstraints--;
+       }
+       nPossibleConstraints = oldPossibleConstraints;
+       return true;                            /* no solution found */
+}
+
+
+/*--------------------
+ * Test a configuration (current set of constraints) for validity.
+ *
+ * Returns:
+ *             0: the configuration is good (no deadlocks)
+ *        -1: the configuration has a hard deadlock or is not self-consistent
+ *             >0: the configuration has one or more soft deadlocks
+ *
+ * In the soft-deadlock case, one of the soft cycles is chosen arbitrarily
+ * and a list of its soft edges is returned beginning at
+ * possibleConstraints+nPossibleConstraints.  The return value is the
+ * number of soft edges.
+ *--------------------
+ */
+static bool
+TestConfiguration(PROC *startProc)
+{
+       int             softFound = 0;
+       EDGE   *softEdges = possibleConstraints + nPossibleConstraints;
+       int             nSoftEdges;
+       int             i;
+
+       /*
+        * Make sure we have room for FindLockCycle's output.
+        */
+       if (nPossibleConstraints + MaxBackends > maxPossibleConstraints)
+               return -1;
+       /*
+        * Expand current constraint set into wait orderings.  Fail if the
+        * constraint set is not self-consistent.
+        */
+       if (!ExpandConstraints(curConstraints, nCurConstraints))
+               return -1;
+       /*
+        * Check for cycles involving startProc or any of the procs mentioned
+        * in constraints.  We check startProc last because if it has a soft
+        * cycle still to be dealt with, we want to deal with that first.
+        */
+       for (i = 0; i < nCurConstraints; i++)
+       {
+               if (FindLockCycle(curConstraints[i].waiter, softEdges, &nSoftEdges))
+               {
+                       if (nSoftEdges == 0)
+                               return -1;              /* hard deadlock detected */
+                       softFound = nSoftEdges;
+               }
+               if (FindLockCycle(curConstraints[i].blocker, softEdges, &nSoftEdges))
+               {
+                       if (nSoftEdges == 0)
+                               return -1;              /* hard deadlock detected */
+                       softFound = nSoftEdges;
+               }
+       }
+       if (FindLockCycle(startProc, softEdges, &nSoftEdges))
+       {
+               if (nSoftEdges == 0)
+                       return -1;                      /* hard deadlock detected */
+               softFound = nSoftEdges;
+       }
+       return softFound;
+}
+
+
+/*
+ * FindLockCycle -- basic check for deadlock cycles
+ *
+ * Scan outward from the given proc to see if there is a cycle in the
+ * waits-for graph that includes this proc.  Return TRUE if a cycle
+ * is found, else FALSE.  If a cycle is found, we also return a list of
+ * the "soft edges", if any, included in the cycle.  These edges could
+ * potentially be eliminated by rearranging wait queues.
+ *
+ * Since we need to be able to check hypothetical configurations that would
+ * exist after wait queue rearrangement, the routine pays attention to the
+ * table of hypothetical queue orders in waitOrders[].  These orders will
+ * be believed in preference to the actual ordering seen in the locktable.
+ */
+static bool
+FindLockCycle(PROC *checkProc,
+                         EDGE *softEdges,      /* output argument */
+                         int *nSoftEdges)      /* output argument */
+{
+       nVisitedProcs = 0;
+       *nSoftEdges = 0;
+       return FindLockCycleRecurse(checkProc, softEdges, nSoftEdges);
+}
+
+static bool
+FindLockCycleRecurse(PROC *checkProc,
+                                        EDGE *softEdges,       /* output argument */
+                                        int *nSoftEdges)       /* output argument */
+{
+       PROC       *proc;
+       LOCK       *lock;
+       HOLDER     *holder;
+       SHM_QUEUE  *lockHolders;
+       LOCKMETHODTABLE *lockMethodTable;
+       LOCKMETHODCTL *lockctl;
+       PROC_QUEUE *waitQueue;
+       int                     queue_size;
+       int                     conflictMask;
+       int                     i;
+       int                     numLockModes,
+                               lm;
+
+       /*
+        * Have we already seen this proc?
+        */
+       for (i = 0; i < nVisitedProcs; i++)
+       {
+               if (visitedProcs[i] == checkProc)
+               {
+                       /* If we return to starting point, we have a deadlock cycle */
+                       if (i == 0)
+                               return true;
+                       /*
+                        * Otherwise, we have a cycle but it does not include the start
+                        * point, so say "no deadlock".
+                        */
+                       return false;
+               }
+       }
+       /* Mark proc as seen */
+       Assert(nVisitedProcs < MaxBackends);
+       visitedProcs[nVisitedProcs++] = checkProc;
+       /*
+        * If the proc is not waiting, we have no outgoing waits-for edges.
+        */
+       if (checkProc->links.next == INVALID_OFFSET)
+               return false;
+       lock = checkProc->waitLock;
+       if (lock == NULL)
+               return false;
+       lockMethodTable = GetLocksMethodTable(lock);
+       lockctl = lockMethodTable->ctl;
+       numLockModes = lockctl->numLockModes;
+       conflictMask = lockctl->conflictTab[checkProc->waitLockMode];
+       /*
+        * Scan for procs that already hold conflicting locks.  These are
+        * "hard" edges in the waits-for graph.
+        */
+       lockHolders = &(lock->lockHolders);
+
+       holder = (HOLDER *) SHMQueueNext(lockHolders, lockHolders,
+                                                                        offsetof(HOLDER, lockLink));
+
+       while (holder)
+       {
+               proc = (PROC *) MAKE_PTR(holder->tag.proc);
+
+               /* A proc never blocks itself */
+               if (proc != checkProc)
+               {
+                       for (lm = 1; lm <= numLockModes; lm++)
+                       {
+                               if (holder->holding[lm] > 0 &&
+                                       ((1 << lm) & conflictMask) != 0)
+                               {
+                                       /* This proc hard-blocks checkProc */
+                                       if (FindLockCycleRecurse(proc, softEdges, nSoftEdges))
+                                               return true;
+                                       /* If no deadlock, we're done looking at this holder */
+                                       break;
+                               }
+                       }
+               }
+
+               holder = (HOLDER *) SHMQueueNext(lockHolders, &holder->lockLink,
+                                                                                offsetof(HOLDER, lockLink));
+       }
+
+       /*
+        * Scan for procs that are ahead of this one in the lock's wait queue.
+        * Those that have conflicting requests soft-block this one.  This must
+        * be done after the hard-block search, since if another proc both
+        * hard- and soft-blocks this one, we want to call it a hard edge.
+        *
+        * If there is a proposed re-ordering of the lock's wait order,
+        * use that rather than the current wait order.
+        */
+       for (i = 0; i < nWaitOrders; i++)
+       {
+               if (waitOrders[i].lock == lock)
+                       break;
+       }
+
+       if (i < nWaitOrders)
+       {
+               /* Use the given hypothetical wait queue order */
+               PROC  **procs = waitOrders[i].procs;
+
+               queue_size = waitOrders[i].nProcs;
+
+               for (i = 0; i < queue_size; i++)
+               {
+                       proc = procs[i];
+
+                       /* Done when we reach the target proc */
+                       if (proc == checkProc)
+                               break;
+
+                       /* Is there a conflict with this guy's request? */
+                       if (((1 << proc->waitLockMode) & conflictMask) != 0)
+                       {
+                               /* This proc soft-blocks checkProc */
+                               if (FindLockCycleRecurse(proc, softEdges, nSoftEdges))
+                               {
+                                       /* Add this edge to the list of soft edges in the cycle */
+                                       Assert(*nSoftEdges < MaxBackends);
+                                       softEdges[*nSoftEdges].waiter = checkProc;
+                                       softEdges[*nSoftEdges].blocker = proc;
+                                       (*nSoftEdges)++;
+                                       return true;
+                               }
+                       }
+               }
+       }
+       else
+       {
+               /* Use the true lock wait queue order */
+               waitQueue = &(lock->waitProcs);
+               queue_size = waitQueue->size;
+
+               proc = (PROC *) MAKE_PTR(waitQueue->links.next);
+
+               while (queue_size-- > 0)
+               {
+                       /* Done when we reach the target proc */
+                       if (proc == checkProc)
+                               break;
+
+                       /* Is there a conflict with this guy's request? */
+                       if (((1 << proc->waitLockMode) & conflictMask) != 0)
+                       {
+                               /* This proc soft-blocks checkProc */
+                               if (FindLockCycleRecurse(proc, softEdges, nSoftEdges))
+                               {
+                                       /* Add this edge to the list of soft edges in the cycle */
+                                       Assert(*nSoftEdges < MaxBackends);
+                                       softEdges[*nSoftEdges].waiter = checkProc;
+                                       softEdges[*nSoftEdges].blocker = proc;
+                                       (*nSoftEdges)++;
+                                       return true;
+                               }
+                       }
+
+                       proc = (PROC *) MAKE_PTR(proc->links.next);
+               }
+       }
+
+       /*
+        * No conflict detected here.
+        */
+       return false;
+}
+
+
+/*
+ * ExpandConstraints -- expand a list of constraints into a set of
+ *             specific new orderings for affected wait queues
+ *
+ * Input is a list of soft edges to be reversed.  The output is a list
+ * of nWaitOrders WAIT_ORDER structs in waitOrders[], with PROC array
+ * workspace in waitOrderProcs[].
+ *
+ * Returns TRUE if able to build an ordering that satisfies all the
+ * constraints, FALSE if not (there are contradictory constraints).
+ */
+static bool
+ExpandConstraints(EDGE *constraints,
+                                 int nConstraints)
+{
+       int                     nWaitOrderProcs = 0;
+       int                     i,
+                               j;
+
+       nWaitOrders = 0;
+       /*
+        * Scan constraint list backwards.  This is because the last-added
+        * constraint is the only one that could fail, and so we want to test
+        * it for inconsistency first.
+        */
+       for (i = nConstraints; --i >= 0; )
+       {
+               PROC   *proc = constraints[i].waiter;
+               LOCK   *lock = proc->waitLock;
+
+               /* Did we already make a list for this lock? */
+               for (j = nWaitOrders; --j >= 0; )
+               {
+                       if (waitOrders[j].lock == lock)
+                               break;
+               }
+               if (j >= 0)
+                       continue;
+               /* No, so allocate a new list */
+               waitOrders[nWaitOrders].lock = lock;
+               waitOrders[nWaitOrders].procs = waitOrderProcs + nWaitOrderProcs;
+               waitOrders[nWaitOrders].nProcs = lock->waitProcs.size;
+               nWaitOrderProcs += lock->waitProcs.size;
+               Assert(nWaitOrderProcs <= MaxBackends);
+               /*
+                * Do the topo sort.  TopoSort need not examine constraints after
+                * this one, since they must be for different locks.
+                */
+               if (!TopoSort(lock, constraints, i+1,
+                                         waitOrders[nWaitOrders].procs))
+                       return false;
+               nWaitOrders++;
+       }
+       return true;
+}
+
+
+/*
+ * TopoSort -- topological sort of a wait queue
+ *
+ * Generate a re-ordering of a lock's wait queue that satisfies given
+ * constraints about certain procs preceding others.  (Each such constraint
+ * is a fact of a partial ordering.)  Minimize rearrangement of the queue
+ * not needed to achieve the partial ordering.
+ *
+ * This is a lot simpler and slower than, for example, the topological sort
+ * algorithm shown in Knuth's Volume 1.  However, Knuth's method doesn't
+ * try to minimize the damage to the existing order.  In practice we are
+ * not likely to be working with more than a few constraints, so the apparent
+ * slowness of the algorithm won't really matter.
+ *
+ * The initial queue ordering is taken directly from the lock's wait queue.
+ * The output is an array of PROC pointers, of length equal to the lock's
+ * wait queue length (the caller is responsible for providing this space).
+ * The partial order is specified by an array of EDGE structs.  Each EDGE
+ * is one that we need to reverse, therefore the "waiter" must appear before
+ * the "blocker" in the output array.  The EDGE array may well contain
+ * edges associated with other locks; these should be ignored.
+ *
+ * Returns TRUE if able to build an ordering that satisfies all the
+ * constraints, FALSE if not (there are contradictory constraints).
+ */
+static bool
+TopoSort(LOCK *lock,
+                EDGE *constraints,
+                int nConstraints,
+                PROC **ordering)               /* output argument */
+{
+       PROC_QUEUE *waitQueue = &(lock->waitProcs);
+       int                     queue_size = waitQueue->size;
+       PROC       *proc;
+       int                     i,
+                               j,
+                               k,
+                               last;
+
+       /* First, fill topoProcs[] array with the procs in their current order */
+       proc = (PROC *) MAKE_PTR(waitQueue->links.next);
+       for (i = 0; i < queue_size; i++)
+       {
+               topoProcs[i] = proc;
+               proc = (PROC *) MAKE_PTR(proc->links.next);
+       }
+
+       /*
+        * Scan the constraints, and for each proc in the array, generate a count
+        * of the number of constraints that say it must be before something else,
+        * plus a list of the constraints that say it must be after something else.
+        * The count for the j'th proc is stored in beforeConstraints[j], and the
+        * head of its list in afterConstraints[j].  Each constraint stores its
+        * list link in constraints[i].link (note any constraint will be in
+        * just one list).  The array index for the before-proc of the i'th
+        * constraint is remembered in constraints[i].pred.
+        */
+       MemSet(beforeConstraints, 0, queue_size * sizeof(int));
+       MemSet(afterConstraints, 0, queue_size * sizeof(int));
+       for (i = 0; i < nConstraints; i++)
+       {
+               proc = constraints[i].waiter;
+               /* Ignore constraint if not for this lock */
+               if (proc->waitLock != lock)
+                       continue;
+               /* Find the waiter proc in the array */
+               for (j = queue_size; --j >= 0; )
+               {
+                       if (topoProcs[j] == proc)
+                               break;
+               }
+               Assert(j >= 0);                 /* should have found a match */
+               /* Find the blocker proc in the array */
+               proc = constraints[i].blocker;
+               for (k = queue_size; --k >= 0; )
+               {
+                       if (topoProcs[k] == proc)
+                               break;
+               }
+               Assert(k >= 0);                 /* should have found a match */
+               beforeConstraints[j]++; /* waiter must come before */
+               /* add this constraint to list of after-constraints for blocker */
+               constraints[i].pred = j;
+               constraints[i].link = afterConstraints[k];
+               afterConstraints[k] = i+1;
+       }
+       /*--------------------
+        * Now scan the topoProcs array backwards.  At each step, output the
+        * last proc that has no remaining before-constraints, and decrease
+        * the beforeConstraints count of each of the procs it was constrained
+        * against.
+        * i = index of ordering[] entry we want to output this time
+        * j = search index for topoProcs[]
+        * k = temp for scanning constraint list for proc j
+        * last = last non-null index in topoProcs (avoid redundant searches)
+        *--------------------
+        */
+       last = queue_size-1;
+       for (i = queue_size; --i >= 0; )
+       {
+               /* Find next candidate to output */
+               while (topoProcs[last] == NULL)
+                       last--;
+               for (j = last; j >= 0; j--)
+               {
+                       if (topoProcs[j] != NULL && beforeConstraints[j] == 0)
+                               break;
+               }
+               /* If no available candidate, topological sort fails */
+               if (j < 0)
+                       return false;
+               /* Output candidate, and mark it done by zeroing topoProcs[] entry */
+               ordering[i] = topoProcs[j];
+               topoProcs[j] = NULL;
+               /* Update beforeConstraints counts of its predecessors */
+               for (k = afterConstraints[j]; k > 0; k = constraints[k-1].link)
+               {
+                       beforeConstraints[constraints[k-1].pred]--;
+               }
+       }
+
+       /* Done */
+       return true;
+}
+
+#ifdef DEBUG_DEADLOCK
+static void
+PrintLockQueue(LOCK *lock, const char *info)
+{
+       PROC_QUEUE *waitQueue = &(lock->waitProcs);
+       int                     queue_size = waitQueue->size;
+       PROC       *proc;
+       int                     i;
+
+       printf("%s lock %lx queue ", info, MAKE_OFFSET(lock));
+       proc = (PROC *) MAKE_PTR(waitQueue->links.next);
+       for (i = 0; i < queue_size; i++)
+       {
+               printf(" %d", proc->pid);
+               proc = (PROC *) MAKE_PTR(proc->links.next);
+       }
+       printf("\n");
+       fflush(stdout);
+}
+#endif
index 3d77ab2b4d133011e10192355de3b93275487148..08e023718ebc766bd11495bd98ea7c09bdb79454 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.80 2001/01/24 19:43:08 momjian Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.81 2001/01/25 03:31:16 tgl Exp $
  *
  * NOTES
  *       Outside modules can create a lock table and acquire/release
  *
  *     LockAcquire(), LockRelease(), LockMethodTableInit(),
  *     LockMethodTableRename(), LockReleaseAll,
- *     LockResolveConflicts(), GrantLock()
+ *     LockCheckConflicts(), GrantLock()
  *
  *-------------------------------------------------------------------------
  */
+#include "postgres.h"
+
 #include <sys/types.h>
 #include <unistd.h>
 #include <signal.h>
 
-#include "postgres.h"
-
 #include "access/xact.h"
 #include "miscadmin.h"
 #include "storage/proc.h"
@@ -44,7 +44,6 @@ static int    WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode,
                                           LOCK *lock, HOLDER *holder);
 static void LockCountMyLocks(SHMEM_OFFSET lockOffset, PROC *proc,
                                                         int *myHolding);
-static int LockGetMyHeldLocks(SHMEM_OFFSET lockOffset, PROC *proc);
 
 static char *lock_types[] =
 {
@@ -211,6 +210,18 @@ LockingDisabled(void)
        return LockingIsDisabled;
 }
 
+/*
+ * Fetch the lock method table associated with a given lock
+ */
+LOCKMETHODTABLE *
+GetLocksMethodTable(LOCK *lock)
+{
+       LOCKMETHOD lockmethod = LOCK_LOCKMETHOD(*lock);
+
+       Assert(lockmethod > 0 && lockmethod < NumLockMethods);
+       return LockMethodTable[lockmethod];
+}
+
 
 /*
  * LockMethodInit -- initialize the lock table's lock type
@@ -559,7 +570,7 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
        if (!holder)
        {
                SpinRelease(masterLock);
-               elog(NOTICE, "LockAcquire: holder table corrupted");
+               elog(FATAL, "LockAcquire: holder table corrupted");
                return FALSE;
        }
 
@@ -623,11 +634,11 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
        Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
 
        /* --------------------
-        * If I'm the only one holding any lock on this object, then there
-        * cannot be a conflict. The same is true if I already hold this lock.
+        * If I already hold one or more locks of the requested type,
+        * just grant myself another one without blocking.
         * --------------------
         */
-       if (holder->nHolding == lock->nGranted || holder->holding[lockmode] != 0)
+       if (holder->holding[lockmode] > 0)
        {
                GrantLock(lock, holder, lockmode);
                HOLDER_PRINT("LockAcquire: owning", holder);
@@ -637,11 +648,11 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
 
        /* --------------------
         * If this process (under any XID) is a holder of the lock,
-        * then there is no conflict, either.
+        * also grant myself another one without blocking.
         * --------------------
         */
        LockCountMyLocks(holder->tag.lock, MyProc, myHolding);
-       if (myHolding[lockmode] != 0)
+       if (myHolding[lockmode] > 0)
        {
                GrantLock(lock, holder, lockmode);
                HOLDER_PRINT("LockAcquire: my other XID owning", holder);
@@ -649,42 +660,27 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
                return TRUE;
        }
 
-       /*
-        * If lock requested conflicts with locks requested by waiters...
+       /* --------------------
+        * If lock requested conflicts with locks requested by waiters,
+        * must join wait queue.  Otherwise, check for conflict with
+        * already-held locks.  (That's last because most complex check.)
+        * --------------------
         */
        if (lockMethodTable->ctl->conflictTab[lockmode] & lock->waitMask)
-       {
-               /*
-                * If my process doesn't hold any locks that conflict with waiters
-                * then force to sleep, so that prior waiters get first chance.
-                */
-               for (i = 1; i <= lockMethodTable->ctl->numLockModes; i++)
-               {
-                       if (myHolding[i] > 0 &&
-                               lockMethodTable->ctl->conflictTab[i] & lock->waitMask)
-                               break;                  /* yes, there is a conflict */
-               }
-
-               if (i > lockMethodTable->ctl->numLockModes)
-               {
-                       HOLDER_PRINT("LockAcquire: another proc already waiting",
-                                                holder);
-                       status = STATUS_FOUND;
-               }
-               else
-                       status = LockResolveConflicts(lockmethod, lockmode,
-                                                                                 lock, holder,
-                                                                                 MyProc, myHolding);
-       }
+               status = STATUS_FOUND;
        else
-               status = LockResolveConflicts(lockmethod, lockmode,
-                                                                         lock, holder,
-                                                                         MyProc, myHolding);
+               status = LockCheckConflicts(lockMethodTable, lockmode,
+                                                                       lock, holder,
+                                                                       MyProc, myHolding);
 
        if (status == STATUS_OK)
+       {
+               /* No conflict with held or previously requested locks */
                GrantLock(lock, holder, lockmode);
-       else if (status == STATUS_FOUND)
+       }
+       else
        {
+               Assert(status == STATUS_FOUND);
 #ifdef USER_LOCKS
 
                /*
@@ -765,49 +761,50 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag,
 }
 
 /* ----------------------------
- * LockResolveConflicts -- test for lock conflicts
+ * LockCheckConflicts -- test whether requested lock conflicts
+ *             with those already granted
+ *
+ * Returns STATUS_FOUND if conflict, STATUS_OK if no conflict.
  *
  * NOTES:
- *             Here's what makes this complicated: one transaction's
- * locks don't conflict with one another.  When many processes
- * hold locks, each has to subtract off the other's locks when
- * determining whether or not any new lock acquired conflicts with
- * the old ones.
+ *             Here's what makes this complicated: one process's locks don't
+ * conflict with one another, even if they are held under different
+ * transaction IDs (eg, session and xact locks do not conflict).
+ * So, we must subtract off our own locks when determining whether the
+ * requested new lock conflicts with those already held.
  *
  * The caller can optionally pass the process's total holding counts, if
  * known.  If NULL is passed then these values will be computed internally.
  * ----------------------------
  */
 int
-LockResolveConflicts(LOCKMETHOD lockmethod,
-                                        LOCKMODE lockmode,
-                                        LOCK *lock,
-                                        HOLDER *holder,
-                                        PROC *proc,
-                                        int *myHolding)                /* myHolding[] array or NULL */
+LockCheckConflicts(LOCKMETHODTABLE *lockMethodTable,
+                                  LOCKMODE lockmode,
+                                  LOCK *lock,
+                                  HOLDER *holder,
+                                  PROC *proc,
+                                  int *myHolding)              /* myHolding[] array or NULL */
 {
-       LOCKMETHODCTL *lockctl = LockMethodTable[lockmethod]->ctl;
+       LOCKMETHODCTL *lockctl = lockMethodTable->ctl;
        int                     numLockModes = lockctl->numLockModes;
        int                     bitmask;
        int                     i,
                                tmpMask;
        int                     localHolding[MAX_LOCKMODES];
 
-       Assert((holder->nHolding >= 0) && (holder->holding[lockmode] >= 0));
-
        /* ----------------------------
         * first check for global conflicts: If no locks conflict
-        * with mine, then I get the lock.
+        * with my request, then I get the lock.
         *
         * Checking for conflict: lock->grantMask represents the types of
         * currently held locks.  conflictTable[lockmode] has a bit
-        * set for each type of lock that conflicts with mine.  Bitwise
+        * set for each type of lock that conflicts with request.       Bitwise
         * compare tells if there is a conflict.
         * ----------------------------
         */
        if (!(lockctl->conflictTab[lockmode] & lock->grantMask))
        {
-               HOLDER_PRINT("LockResolveConflicts: no conflict", holder);
+               HOLDER_PRINT("LockCheckConflicts: no conflict", holder);
                return STATUS_OK;
        }
 
@@ -844,11 +841,11 @@ LockResolveConflicts(LOCKMETHOD lockmethod,
        if (!(lockctl->conflictTab[lockmode] & bitmask))
        {
                /* no conflict. OK to get the lock */
-               HOLDER_PRINT("LockResolveConflicts: resolved", holder);
+               HOLDER_PRINT("LockCheckConflicts: resolved", holder);
                return STATUS_OK;
        }
 
-       HOLDER_PRINT("LockResolveConflicts: conflicting", holder);
+       HOLDER_PRINT("LockCheckConflicts: conflicting", holder);
        return STATUS_FOUND;
 }
 
@@ -889,33 +886,12 @@ LockCountMyLocks(SHMEM_OFFSET lockOffset, PROC *proc, int *myHolding)
        }
 }
 
-/*
- * LockGetMyHeldLocks -- compute bitmask of lock types held by a process
- *             for a given lockable object.
- */
-static int
-LockGetMyHeldLocks(SHMEM_OFFSET lockOffset, PROC *proc)
-{
-       int                     myHolding[MAX_LOCKMODES];
-       int                     heldLocks = 0;
-       int                     i,
-                               tmpMask;
-
-       LockCountMyLocks(lockOffset, proc, myHolding);
-
-       for (i = 1, tmpMask = 2;
-                i < MAX_LOCKMODES;
-                i++, tmpMask <<= 1)
-       {
-               if (myHolding[i] > 0)
-                       heldLocks |= tmpMask;
-       }
-       return heldLocks;
-}
-
 /*
  * GrantLock -- update the lock and holder data structures to show
  *             the lock request has been granted.
+ *
+ * NOTE: if proc was blocked, it also needs to be removed from the wait list
+ * and have its waitLock/waitHolder fields cleared.  That's not done here.
  */
 void
 GrantLock(LOCK *lock, HOLDER *holder, LOCKMODE lockmode)
@@ -936,6 +912,9 @@ GrantLock(LOCK *lock, HOLDER *holder, LOCKMODE lockmode)
 /*
  * WaitOnLock -- wait to acquire a lock
  *
+ * Caller must have set MyProc->heldLocks to reflect locks already held
+ * on the lockable object by this process (under all XIDs).
+ *
  * The locktable spinlock must be held at entry.
  */
 static int
@@ -956,7 +935,7 @@ WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode,
        strcat(new_status, " waiting");
        set_ps_display(new_status);
 
-       /*
+       /* -------------------
         * NOTE: Think not to put any lock state cleanup after the call to
         * ProcSleep, in either the normal or failure path.  The lock state
         * must be fully set by the lock grantor, or by HandleDeadLock if we
@@ -965,12 +944,13 @@ WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode,
         * after someone else grants us the lock, but before we've noticed it.
         * Hence, after granting, the locktable state must fully reflect the
         * fact that we own the lock; we can't do additional work on return.
+        * -------------------
         */
 
-       if (ProcSleep(lockMethodTable->ctl,
+       if (ProcSleep(lockMethodTable,
                                  lockmode,
                                  lock,
-                                 holder) != NO_ERROR)
+                                 holder) != STATUS_OK)
        {
                /* -------------------
                 * We failed as a result of a deadlock, see HandleDeadLock().
@@ -992,14 +972,60 @@ WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode,
        return STATUS_OK;
 }
 
+/*--------------------
+ * Remove a proc from the wait-queue it is on
+ * (caller must know it is on one).
+ *
+ * Locktable lock must be held by caller.
+ *
+ * NB: this does not remove the process' holder object, nor the lock object,
+ * even though their counts might now have gone to zero.  That will happen
+ * during a subsequent LockReleaseAll call, which we expect will happen
+ * during transaction cleanup.  (Removal of a proc from its wait queue by
+ * this routine can only happen if we are aborting the transaction.)
+ *--------------------
+ */
+void
+RemoveFromWaitQueue(PROC *proc)
+{
+       LOCK   *waitLock = proc->waitLock;
+       LOCKMODE lockmode = proc->waitLockMode;
+
+       /* Make sure proc is waiting */
+       Assert(proc->links.next != INVALID_OFFSET);
+       Assert(waitLock);
+       Assert(waitLock->waitProcs.size > 0);
+
+       /* Remove proc from lock's wait queue */
+       SHMQueueDelete(&(proc->links));
+       waitLock->waitProcs.size--;
+
+       /* Undo increments of request counts by waiting process */
+       Assert(waitLock->nRequested > 0);
+       Assert(waitLock->nRequested > proc->waitLock->nGranted);
+       waitLock->nRequested--;
+       Assert(waitLock->requested[lockmode] > 0);
+       waitLock->requested[lockmode]--;
+       /* don't forget to clear waitMask bit if appropriate */
+       if (waitLock->granted[lockmode] == waitLock->requested[lockmode])
+               waitLock->waitMask &= BITS_OFF[lockmode];
+
+       /* Clean up the proc's own state */
+       proc->waitLock = NULL;
+       proc->waitHolder = NULL;
+
+       /* See if any other waiters for the lock can be woken up now */
+       ProcLockWakeup(GetLocksMethodTable(waitLock), waitLock);
+}
+
 /*
  * LockRelease -- look up 'locktag' in lock table 'lockmethod' and
- *             release it.
+ *             release one 'lockmode' lock on it.
  *
- * Side Effects: if the lock no longer conflicts with the highest
- *             priority waiting process, that process is granted the lock
- *             and awoken. (We have to grant the lock here to avoid a
- *             race between the waking process and any new process to
+ * Side Effects: find any waiting processes that are now wakable,
+ *             grant them their requested locks and awaken them.
+ *             (We have to grant the lock here to avoid a race between
+ *             the waking process and any new process to
  *             come along and request the lock.)
  */
 bool
@@ -1013,7 +1039,7 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag,
        HOLDER     *holder;
        HOLDERTAG       holdertag;
        HTAB       *holderTable;
-       bool            wakeupNeeded = true;
+       bool            wakeupNeeded = false;
 
 #ifdef LOCK_DEBUG
        if (lockmethod == USER_LOCKMETHOD && Trace_userlocks)
@@ -1086,7 +1112,6 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag,
                return FALSE;
        }
        HOLDER_PRINT("LockRelease: found", holder);
-       Assert(holder->tag.lock == MAKE_OFFSET(lock));
 
        /*
         * Check that we are actually holding a lock of the type we want to
@@ -1094,11 +1119,11 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag,
         */
        if (!(holder->holding[lockmode] > 0))
        {
-               SpinRelease(masterLock);
                HOLDER_PRINT("LockRelease: WRONGTYPE", holder);
+               Assert(holder->holding[lockmode] >= 0);
+               SpinRelease(masterLock);
                elog(NOTICE, "LockRelease: you don't own a lock of type %s",
                         lock_types[lockmode]);
-               Assert(holder->holding[lockmode] >= 0);
                return FALSE;
        }
        Assert(holder->nHolding > 0);
@@ -1120,34 +1145,24 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag,
                lock->grantMask &= BITS_OFF[lockmode];
        }
 
-#ifdef NOT_USED
+       LOCK_PRINT("LockRelease: updated", lock, lockmode);
+       Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
+       Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
+       Assert(lock->nGranted <= lock->nRequested);
+
        /* --------------------------
-        * If there are still active locks of the type I just released, no one
-        * should be woken up.  Whoever is asleep will still conflict
-        * with the remaining locks.
+        * We need only run ProcLockWakeup if the released lock conflicts with
+        * at least one of the lock types requested by waiter(s).  Otherwise
+        * whatever conflict made them wait must still exist.  NOTE: before MVCC,
+        * we could skip wakeup if lock->granted[lockmode] was still positive.
+        * But that's not true anymore, because the remaining granted locks might
+        * belong to some waiter, who could now be awakened because he doesn't
+        * conflict with his own locks.
         * --------------------------
         */
-       if (lock->granted[lockmode])
-               wakeupNeeded = false;
-       else
-#endif
-
-               /*
-                * Above is not valid any more (due to MVCC lock modes). Actually
-                * we should compare granted[lockmode] with number of
-                * waiters holding lock of this type and try to wakeup only if
-                * these numbers are equal (and lock released conflicts with locks
-                * requested by waiters). For the moment we only check the last
-                * condition.
-                */
        if (lockMethodTable->ctl->conflictTab[lockmode] & lock->waitMask)
                wakeupNeeded = true;
 
-       LOCK_PRINT("LockRelease: updated", lock, lockmode);
-       Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
-       Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
-       Assert(lock->nGranted <= lock->nRequested);
-
        if (lock->nRequested == 0)
        {
                /* ------------------
@@ -1161,8 +1176,13 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag,
                                                                        (Pointer) &(lock->tag),
                                                                        HASH_REMOVE,
                                                                        &found);
-               Assert(lock && found);
-               wakeupNeeded = false;
+               if (!lock || !found)
+               {
+                       SpinRelease(masterLock);
+                       elog(NOTICE, "LockRelease: remove lock, table corrupted");
+                       return FALSE;
+               }
+               wakeupNeeded = false;   /* should be false, but make sure */
        }
 
        /*
@@ -1192,12 +1212,11 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag,
                }
        }
 
+       /*
+        * Wake up waiters if needed.
+        */
        if (wakeupNeeded)
-               ProcLockWakeup(lockmethod, lock);
-#ifdef LOCK_DEBUG
-       else if (LOCK_DEBUG_ENABLED(lock))
-        elog(DEBUG, "LockRelease: no wakeup needed");
-#endif
+               ProcLockWakeup(lockMethodTable, lock);
 
        SpinRelease(masterLock);
        return TRUE;
@@ -1310,8 +1329,8 @@ LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc,
                else
                {
                        /* --------------
-                        * set nRequested to zero so that we can garbage collect the lock
-                        * down below...
+                        * This holder accounts for all the requested locks on the object,
+                        * so we can be lazy and just zero things out.
                         * --------------
                         */
                        lock->nRequested = 0;
@@ -1347,7 +1366,7 @@ LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc,
                        return FALSE;
                }
 
-               if (!lock->nRequested)
+               if (lock->nRequested == 0)
                {
                        /* --------------------
                         * We've just released the last lock, so garbage-collect the
@@ -1359,7 +1378,7 @@ LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc,
                        lock = (LOCK *) hash_search(lockMethodTable->lockHash,
                                                                                (Pointer) &(lock->tag),
                                                                                HASH_REMOVE, &found);
-                       if ((!lock) || (!found))
+                       if (!lock || !found)
                        {
                                SpinRelease(masterLock);
                                elog(NOTICE, "LockReleaseAll: cannot remove lock from HTAB");
@@ -1367,7 +1386,7 @@ LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc,
                        }
                }
                else if (wakeupNeeded)
-                       ProcLockWakeup(lockmethod, lock);
+                       ProcLockWakeup(lockMethodTable, lock);
 
 next_item:
                holder = nextHolder;
@@ -1412,245 +1431,6 @@ LockShmemSize(int maxBackends)
        return size;
 }
 
-/*
- * DeadLockCheck -- Checks for deadlocks for a given process
- *
- * This code takes a list of locks a process holds, and the lock that
- * the process is sleeping on, and tries to find if any of the processes
- * waiting on its locks hold the lock it is waiting for.  If no deadlock
- * is found, it goes on to look at all the processes waiting on their locks.
- *
- * We can't block on user locks, so no sense testing for deadlock
- * because there is no blocking, and no timer for the block.  So,
- * only look at regular locks.
- *
- * We have already locked the master lock before being called.
- */
-bool
-DeadLockCheck(PROC *thisProc, LOCK *findlock)
-{
-       PROC       *waitProc;
-       PROC_QUEUE *waitQueue;
-       SHM_QUEUE  *procHolders = &(thisProc->procHolders);
-       HOLDER     *holder;
-       HOLDER     *nextHolder;
-       LOCKMETHODCTL *lockctl = LockMethodTable[DEFAULT_LOCKMETHOD]->ctl;
-       LOCK       *lock;
-       int                     i,
-                               j;
-       bool            first_run = (thisProc == MyProc);
-
-       static PROC *checked_procs[MAXBACKENDS];
-       static int      nprocs;
-
-       /* initialize at start of recursion */
-       if (first_run)
-       {
-               checked_procs[0] = thisProc;
-               nprocs = 1;
-       }
-
-       /*
-        * Scan over all the locks held/awaited by thisProc.
-        */
-       holder = (HOLDER *) SHMQueueNext(procHolders, procHolders,
-                                                                        offsetof(HOLDER, procLink));
-
-       while (holder)
-       {
-               /* Get link first, since we may unlink/delete this holder */
-               nextHolder = (HOLDER *) SHMQueueNext(procHolders, &holder->procLink,
-                                                                                        offsetof(HOLDER, procLink));
-
-               Assert(holder->tag.proc == MAKE_OFFSET(thisProc));
-
-               lock = (LOCK *) MAKE_PTR(holder->tag.lock);
-
-               /* Ignore user locks */
-               if (lock->tag.lockmethod != DEFAULT_LOCKMETHOD)
-                       goto nxtl;
-
-               HOLDER_PRINT("DeadLockCheck", holder);
-               LOCK_PRINT("DeadLockCheck", lock, 0);
-
-               /*
-                * waitLock is always in procHolders of waiting proc, if !first_run
-                * then upper caller will handle waitProcs queue of waitLock.
-                */
-               if (thisProc->waitLock == lock && !first_run)
-                       goto nxtl;
-
-               /*
-                * If we found proc holding findlock and sleeping on some my other
-                * lock then we have to check does it block me or another waiters.
-                */
-               if (lock == findlock && !first_run)
-               {
-                       int                     lm;
-
-                       Assert(holder->nHolding > 0);
-                       for (lm = 1; lm <= lockctl->numLockModes; lm++)
-                       {
-                               if (holder->holding[lm] > 0 &&
-                                       lockctl->conflictTab[lm] & findlock->waitMask)
-                                       return true;
-                       }
-
-                       /*
-                        * Else - get the next lock from thisProc's procHolders
-                        */
-                       goto nxtl;
-               }
-
-               waitQueue = &(lock->waitProcs);
-               waitProc = (PROC *) MAKE_PTR(waitQueue->links.next);
-
-               /*
-                * Inner loop scans over all processes waiting for this lock.
-                *
-                * NOTE: loop must count down because we want to examine each item
-                * in the queue even if waitQueue->size decreases due to waking up
-                * some of the processes.
-                */
-               for (i = waitQueue->size; --i >= 0; )
-               {
-                       Assert(waitProc->waitLock == lock);
-                       if (waitProc == thisProc)
-                       {
-                               /* This should only happen at first level */
-                               Assert(waitProc == MyProc);
-                               goto nextWaitProc;
-                       }
-                       if (lock == findlock)           /* first_run also true */
-                       {
-                               /*
-                                * If I'm blocked by his heldLocks...
-                                */
-                               if (lockctl->conflictTab[MyProc->waitLockMode] & waitProc->heldLocks)
-                               {
-                                       /* and he blocked by me -> deadlock */
-                                       if (lockctl->conflictTab[waitProc->waitLockMode] & MyProc->heldLocks)
-                                               return true;
-                                       /* we shouldn't look at procHolders of our blockers */
-                                       goto nextWaitProc;
-                               }
-
-                               /*
-                                * If he isn't blocked by me and we request
-                                * non-conflicting lock modes - no deadlock here because
-                                * he isn't blocked by me in any sense (explicitly or
-                                * implicitly). Note that we don't do like test if
-                                * !first_run (when thisProc is holder and non-waiter on
-                                * lock) and so we call DeadLockCheck below for every
-                                * waitProc in thisProc->procHolders, even for waitProc-s
-                                * un-blocked by thisProc. Should we? This could save us
-                                * some time...
-                                */
-                               if (!(lockctl->conflictTab[waitProc->waitLockMode] & MyProc->heldLocks) &&
-                                       !(lockctl->conflictTab[waitProc->waitLockMode] & (1 << MyProc->waitLockMode)))
-                                       goto nextWaitProc;
-                       }
-
-                       /*
-                        * Skip this waiter if already checked.
-                        */
-                       for (j = 0; j < nprocs; j++)
-                       {
-                               if (checked_procs[j] == waitProc)
-                                       goto nextWaitProc;
-                       }
-
-                       /* Recursively check this process's procHolders. */
-                       Assert(nprocs < MAXBACKENDS);
-                       checked_procs[nprocs++] = waitProc;
-
-                       if (DeadLockCheck(waitProc, findlock))
-                       {
-                               int                     heldLocks;
-
-                               /*
-                                * Ok, but is waitProc waiting for me (thisProc) ?
-                                */
-                               if (thisProc->waitLock == lock)
-                               {
-                                       Assert(first_run);
-                                       heldLocks = thisProc->heldLocks;
-                               }
-                               else
-                               {
-                                       /* should we cache heldLocks to speed this up? */
-                                       heldLocks = LockGetMyHeldLocks(holder->tag.lock, thisProc);
-                                       Assert(heldLocks != 0);
-                               }
-                               if (lockctl->conflictTab[waitProc->waitLockMode] & heldLocks)
-                               {
-                                       /*
-                                        * Last attempt to avoid deadlock: try to wakeup myself.
-                                        */
-                                       if (first_run)
-                                       {
-                                               if (LockResolveConflicts(DEFAULT_LOCKMETHOD,
-                                                                                                MyProc->waitLockMode,
-                                                                                                MyProc->waitLock,
-                                                                                                MyProc->waitHolder,
-                                                                                                MyProc,
-                                                                                                NULL) == STATUS_OK)
-                                               {
-                                                       GrantLock(MyProc->waitLock,
-                                                                         MyProc->waitHolder,
-                                                                         MyProc->waitLockMode);
-                                                       ProcWakeup(MyProc, NO_ERROR);
-                                                       return false;
-                                               }
-                                       }
-                                       return true;
-                               }
-
-                               /*
-                                * Hell! Is he blocked by any (other) holder ?
-                                */
-                               if (LockResolveConflicts(DEFAULT_LOCKMETHOD,
-                                                                                waitProc->waitLockMode,
-                                                                                lock,
-                                                                                waitProc->waitHolder,
-                                                                                waitProc,
-                                                                                NULL) != STATUS_OK)
-                               {
-                                       /*
-                                        * Blocked by others - no deadlock...
-                                        */
-                                       LOCK_PRINT("DeadLockCheck: blocked by others",
-                                                          lock, waitProc->waitLockMode);
-                                       goto nextWaitProc;
-                               }
-
-                               /*
-                                * Well - wakeup this guy! This is the case of
-                                * implicit blocking: thisProc blocked someone who
-                                * blocked waitProc by the fact that he/someone is
-                                * already waiting for lock.  We do this for
-                                * anti-starving.
-                                */
-                               GrantLock(lock, waitProc->waitHolder, waitProc->waitLockMode);
-                               waitProc = ProcWakeup(waitProc, NO_ERROR);
-                               /*
-                                * Use next-proc link returned by ProcWakeup, since this
-                                * proc's own links field is now cleared.
-                                */
-                               continue;
-                       }
-
-nextWaitProc:
-                       waitProc = (PROC *) MAKE_PTR(waitProc->links.next);
-               }
-
-nxtl:
-               holder = nextHolder;
-       }
-
-       /* if we got here, no deadlock */
-       return false;
-}
 
 #ifdef LOCK_DEBUG
 /*
index 377e9dbeb58261fd5cf8b8b48eb96c14989fd96b..fd4c4b1485687e78effde87c01a88c48dc684e65 100644 (file)
@@ -8,7 +8,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.96 2001/01/24 19:43:08 momjian Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.97 2001/01/25 03:31:16 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -18,7 +18,7 @@
  *
  *
  * Interface (a):
- *             ProcSleep(), ProcWakeup(), ProcWakeupNext(),
+ *             ProcSleep(), ProcWakeup(),
  *             ProcQueueAlloc() -- create a shm queue for sleeping processes
  *             ProcQueueInit() -- create a queue without allocing memory
  *
@@ -47,8 +47,6 @@
  *             shared among backends (we keep a few sets of semaphores around).
  *             This is so that we can support more backends. (system-wide semaphore
  *             sets run out pretty fast.)                                -ay 4/95
- *
- * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.96 2001/01/24 19:43:08 momjian Exp $
  */
 #include "postgres.h"
 
@@ -257,7 +255,7 @@ InitProcess(void)
        }
 
        SHMQueueElemInit(&(MyProc->links));
-       MyProc->errType = NO_ERROR;
+       MyProc->errType = STATUS_OK;
        MyProc->pid = MyProcPid;
        MyProc->databaseId = MyDatabaseId;
        MyProc->xid = InvalidTransactionId;
@@ -284,7 +282,16 @@ InitProcess(void)
                (location != MAKE_OFFSET(MyProc)))
                elog(STOP, "InitProcess: ShmemPID table broken");
 
+       /*
+        * Arrange to clean up at backend exit.
+        */
        on_shmem_exit(ProcKill, 0);
+
+       /*
+        * Now that we have a PROC, we could try to acquire locks,
+        * so initialize the deadlock checker.
+        */
+       InitDeadLockChecking();
 }
 
 /*
@@ -304,50 +311,6 @@ ZeroProcSemaphore(PROC *proc)
        }
 }
 
-/*
- * Remove a proc from the wait-queue it is on
- * (caller must know it is on one).
- * Locktable lock must be held by caller.
- *
- * NB: this does not remove the process' holder object, nor the lock object,
- * even though their counts might now have gone to zero.  That will happen
- * during a subsequent LockReleaseAll call, which we expect will happen
- * during transaction cleanup.  (Removal of a proc from its wait queue by
- * this routine can only happen if we are aborting the transaction.)
- */
-static void
-RemoveFromWaitQueue(PROC *proc)
-{
-       LOCK   *waitLock = proc->waitLock;
-       LOCKMODE lockmode = proc->waitLockMode;
-
-       /* Make sure proc is waiting */
-       Assert(proc->links.next != INVALID_OFFSET);
-       Assert(waitLock);
-       Assert(waitLock->waitProcs.size > 0);
-
-       /* Remove proc from lock's wait queue */
-       SHMQueueDelete(&(proc->links));
-       waitLock->waitProcs.size--;
-
-       /* Undo increments of request counts by waiting process */
-       Assert(waitLock->nRequested > 0);
-       Assert(waitLock->nRequested > proc->waitLock->nGranted);
-       waitLock->nRequested--;
-       Assert(waitLock->requested[lockmode] > 0);
-       waitLock->requested[lockmode]--;
-       /* don't forget to clear waitMask bit if appropriate */
-       if (waitLock->granted[lockmode] == waitLock->requested[lockmode])
-               waitLock->waitMask &= ~(1 << lockmode);
-
-       /* Clean up the proc's own state */
-       proc->waitLock = NULL;
-       proc->waitHolder = NULL;
-
-       /* See if any other waiters for the lock can be woken up now */
-       ProcLockWakeup(LOCK_LOCKMETHOD(*waitLock), waitLock);
-}
-
 /*
  * Cancel any pending wait for lock, when aborting a transaction.
  *
@@ -529,34 +492,34 @@ ProcQueueInit(PROC_QUEUE *queue)
 /*
  * ProcSleep -- put a process to sleep
  *
- * P() on the semaphore should put us to sleep.  The process
- * semaphore is normally zero, so when we try to acquire it, we sleep.
+ * Caller must have set MyProc->heldLocks to reflect locks already held
+ * on the lockable object by this process (under all XIDs).
  *
  * Locktable's spinlock must be held at entry, and will be held
  * at exit.
  *
- * Result is NO_ERROR if we acquired the lock, STATUS_ERROR if not (deadlock).
+ * Result: STATUS_OK if we acquired the lock, STATUS_ERROR if not (deadlock).
  *
  * ASSUME: that no one will fiddle with the queue until after
  *             we release the spin lock.
  *
  * NOTES: The process queue is now a priority queue for locking.
+ *
+ * P() on the semaphore should put us to sleep.  The process
+ * semaphore is normally zero, so when we try to acquire it, we sleep.
  */
 int
-ProcSleep(LOCKMETHODCTL *lockctl,
+ProcSleep(LOCKMETHODTABLE *lockMethodTable,
                  LOCKMODE lockmode,
                  LOCK *lock,
                  HOLDER *holder)
 {
-       PROC_QUEUE *waitQueue = &(lock->waitProcs);
+       LOCKMETHODCTL *lockctl = lockMethodTable->ctl;
        SPINLOCK        spinlock = lockctl->masterLock;
-       int                     myMask = (1 << lockmode);
-       int                     waitMask = lock->waitMask;
+       PROC_QUEUE *waitQueue = &(lock->waitProcs);
+       int                     myHeldLocks = MyProc->heldLocks;
        PROC       *proc;
        int                     i;
-       int                     aheadGranted[MAX_LOCKMODES];
-       bool            selfConflict = (lockctl->conflictTab[lockmode] & myMask),
-                               prevSame = false;
 #ifndef __BEOS__
        struct itimerval timeval,
                                dummy;
@@ -564,64 +527,63 @@ ProcSleep(LOCKMETHODCTL *lockctl,
     bigtime_t time_interval;
 #endif
 
-       proc = (PROC *) MAKE_PTR(waitQueue->links.next);
-
-       /* if we don't conflict with any waiter - be first in queue */
-       if (!(lockctl->conflictTab[lockmode] & waitMask))
-               goto ins;
-
-       /* otherwise, determine where we should go into the queue */
-       for (i = 1; i < MAX_LOCKMODES; i++)
-               aheadGranted[i] = lock->granted[i];
-       (aheadGranted[lockmode])++;
-
-       for (i = 0; i < waitQueue->size; i++)
+       /* ----------------------
+        * Determine where to add myself in the wait queue.
+        *
+        * Normally I should go at the end of the queue.  However, if I already
+        * hold locks that conflict with the request of any previous waiter,
+        * put myself in the queue just in front of the first such waiter.
+        * This is not a necessary step, since deadlock detection would move
+        * me to before that waiter anyway; but it's relatively cheap to detect
+        * such a conflict immediately, and avoid delaying till deadlock timeout.
+        *
+        * Special case: if I find I should go in front of the first waiter,
+        * and I do not conflict with already-held locks, then just grant myself
+        * the requested lock immediately.
+        * ----------------------
+        */
+       if (myHeldLocks != 0)
        {
-               LOCKMODE        procWaitMode = proc->waitLockMode;
-
-               /* must I wait for him ? */
-               if (lockctl->conflictTab[lockmode] & proc->heldLocks)
+               proc = (PROC *) MAKE_PTR(waitQueue->links.next);
+               for (i = 0; i < waitQueue->size; i++)
                {
-                       /* is he waiting for me ? */
-                       if (lockctl->conflictTab[procWaitMode] & MyProc->heldLocks)
+                       /* Must he wait for me? */
+                       if (lockctl->conflictTab[proc->waitLockMode] & myHeldLocks)
                        {
-                               /* Yes, report deadlock failure */
-                               MyProc->errType = STATUS_ERROR;
-                               return STATUS_ERROR;
-                       }
-                       /* I must go after him in queue - so continue loop */
-               }
-               /* if he waits for me, go before him in queue */
-               else if (lockctl->conflictTab[procWaitMode] & MyProc->heldLocks)
-                       break;
-               /* if conflicting locks requested */
-               else if (lockctl->conflictTab[procWaitMode] & myMask)
-               {
-
-                       /*
-                        * If I request non self-conflicting lock and there are others
-                        * requesting the same lock just before this guy - stop here.
-                        */
-                       if (!selfConflict && prevSame)
+                               /* Must I wait for him ? */
+                               if (lockctl->conflictTab[lockmode] & proc->heldLocks)
+                               {
+                                       /* Yes, can report deadlock failure immediately */
+                                       MyProc->errType = STATUS_ERROR;
+                                       return STATUS_ERROR;
+                               }
+                               if (i == 0)
+                               {
+                                       /* I must go before first waiter.  Check special case. */
+                                       if (LockCheckConflicts(lockMethodTable,
+                                                                                  lockmode,
+                                                                                  lock,
+                                                                                  holder,
+                                                                                  MyProc,
+                                                                                  NULL) == STATUS_OK)
+                                       {
+                                               /* Skip the wait and just grant myself the lock. */
+                                               GrantLock(lock, holder, lockmode);
+                                               return STATUS_OK;
+                                       }
+                               }
+                               /* Break out of loop to put myself before him */
                                break;
+                       }
+                       proc = (PROC *) MAKE_PTR(proc->links.next);
                }
-
-               /*
-                * Last attempt to not move any further to the back of the queue:
-                * if we don't conflict with remaining waiters, stop here.
-                */
-               else if (!(lockctl->conflictTab[lockmode] & waitMask))
-                       break;
-
-               /* Move past this guy, and update state accordingly */
-               prevSame = (procWaitMode == lockmode);
-               (aheadGranted[procWaitMode])++;
-               if (aheadGranted[procWaitMode] == lock->requested[procWaitMode])
-                       waitMask &= ~(1 << procWaitMode);
-               proc = (PROC *) MAKE_PTR(proc->links.next);
+       }
+       else
+       {
+               /* I hold no locks, so I can't push in front of anyone. */
+               proc = (PROC *) &(waitQueue->links);
        }
 
-ins:;
        /* -------------------
         * Insert self into queue, ahead of the given proc (or at tail of queue).
         * -------------------
@@ -629,15 +591,14 @@ ins:;
        SHMQueueInsertBefore(&(proc->links), &(MyProc->links));
        waitQueue->size++;
 
-       lock->waitMask |= myMask;
+       lock->waitMask |= (1 << lockmode);
 
        /* Set up wait information in PROC object, too */
        MyProc->waitLock = lock;
        MyProc->waitHolder = holder;
        MyProc->waitLockMode = lockmode;
-       /* We assume the caller set up MyProc->heldLocks */
 
-       MyProc->errType = NO_ERROR;             /* initialize result for success */
+       MyProc->errType = STATUS_OK; /* initialize result for success */
 
        /* mark that we are waiting for a lock */
        waitingForLock = true;
@@ -662,7 +623,7 @@ ins:;
         * By delaying the check until we've waited for a bit, we can avoid
         * running the rather expensive deadlock-check code in most cases.
         *
-        * Need to zero out struct to set the interval and the micro seconds fields
+        * Need to zero out struct to set the interval and the microseconds fields
         * to 0.
         * --------------
         */
@@ -768,89 +729,59 @@ ProcWakeup(PROC *proc, int errType)
 
 /*
  * ProcLockWakeup -- routine for waking up processes when a lock is
- *             released.
+ *             released (or a prior waiter is aborted).  Scan all waiters
+ *             for lock, waken any that are no longer blocked.
  */
-int
-ProcLockWakeup(LOCKMETHOD lockmethod, LOCK *lock)
+void
+ProcLockWakeup(LOCKMETHODTABLE *lockMethodTable, LOCK *lock)
 {
-       PROC_QUEUE *queue = &(lock->waitProcs);
+       LOCKMETHODCTL *lockctl = lockMethodTable->ctl;
+       PROC_QUEUE *waitQueue = &(lock->waitProcs);
+       int                     queue_size = waitQueue->size;
        PROC       *proc;
-       int                     awoken = 0;
-       LOCKMODE        last_lockmode = 0;
-       int                     queue_size = queue->size;
+       int                     conflictMask = 0;
 
        Assert(queue_size >= 0);
 
-       if (!queue_size)
-               return STATUS_NOT_FOUND;
+       if (queue_size == 0)
+               return;
 
-       proc = (PROC *) MAKE_PTR(queue->links.next);
+       proc = (PROC *) MAKE_PTR(waitQueue->links.next);
 
        while (queue_size-- > 0)
        {
-               if (proc->waitLockMode == last_lockmode)
-               {
-                       /*
-                        * This proc will conflict as the previous one did, don't even
-                        * try.
-                        */
-                       goto nextProc;
-               }
+               LOCKMODE lockmode = proc->waitLockMode;
 
                /*
-                * Does this proc conflict with locks held by others ?
+                * Waken if (a) doesn't conflict with requests of earlier waiters,
+                * and (b) doesn't conflict with already-held locks.
                 */
-               if (LockResolveConflicts(lockmethod,
-                                                                proc->waitLockMode,
-                                                                lock,
-                                                                proc->waitHolder,
-                                                                proc,
-                                                                NULL) != STATUS_OK)
+               if (((1 << lockmode) & conflictMask) == 0 &&
+                       LockCheckConflicts(lockMethodTable,
+                                                          lockmode,
+                                                          lock,
+                                                          proc->waitHolder,
+                                                          proc,
+                                                          NULL) == STATUS_OK)
                {
-                       /* Yes.  Quit if we already awoke at least one process. */
-                       if (awoken != 0)
-                               break;
-                       /* Otherwise, see if any later waiters can be awoken. */
-                       last_lockmode = proc->waitLockMode;
-                       goto nextProc;
+                       /* OK to waken */
+                       GrantLock(lock, proc->waitHolder, lockmode);
+                       proc = ProcWakeup(proc, STATUS_OK);
+                       /*
+                        * ProcWakeup removes proc from the lock's waiting process queue
+                        * and returns the next proc in chain; don't use proc's next-link,
+                        * because it's been cleared.
+                        */
                }
-
-               /*
-                * OK to wake up this sleeping process.
-                */
-               GrantLock(lock, proc->waitHolder, proc->waitLockMode);
-               proc = ProcWakeup(proc, NO_ERROR);
-               awoken++;
-
-               /*
-                * ProcWakeup removes proc from the lock's waiting process queue
-                * and returns the next proc in chain; don't use proc's next-link,
-                * because it's been cleared.
-                */
-               continue;
-
-nextProc:
-               proc = (PROC *) MAKE_PTR(proc->links.next);
-       }
-
-       Assert(queue->size >= 0);
-
-       if (awoken)
-               return STATUS_OK;
-       else
-       {
-               /* Something is still blocking us.      May have deadlocked. */
-#ifdef LOCK_DEBUG
-               if (lock->tag.lockmethod == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
+               else
                {
-                       elog(DEBUG, "ProcLockWakeup: lock(%lx) can't wake up any process",
-                                MAKE_OFFSET(lock));
-                       if (Debug_deadlocks)
-                               DumpAllLocks();
+                       /* Cannot wake this guy.  Add his request to conflict mask. */
+                       conflictMask |= lockctl->conflictTab[lockmode];
+                       proc = (PROC *) MAKE_PTR(proc->links.next);
                }
-#endif
-               return STATUS_NOT_FOUND;
        }
+
+       Assert(waitQueue->size >= 0);
 }
 
 /* --------------------
@@ -900,7 +831,7 @@ HandleDeadLock(SIGNAL_ARGS)
         DumpAllLocks();
 #endif
 
-       if (!DeadLockCheck(MyProc, MyProc->waitLock))
+       if (!DeadLockCheck(MyProc))
        {
                /* No deadlock, so keep waiting */
                UnlockLockTable();
index 39eefac0ca6f827437a4550e8e9c842a5be0591a..0ada6eaccfebee6e5b18db0934e3746dce7bdae4 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: lock.h,v 1.43 2001/01/24 19:43:27 momjian Exp $
+ * $Id: lock.h,v 1.44 2001/01/25 03:31:16 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -247,6 +247,7 @@ typedef struct HOLDER
 extern void InitLocks(void);
 extern void LockDisable(bool status);
 extern bool LockingDisabled(void);
+extern LOCKMETHODTABLE *GetLocksMethodTable(LOCK *lock);
 extern LOCKMETHOD LockMethodTableInit(char *tabName, LOCKMASK *conflictsP,
                                        int *prioP, int numModes, int maxBackends);
 extern LOCKMETHOD LockMethodTableRename(LOCKMETHOD lockmethod);
@@ -256,12 +257,15 @@ extern bool LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag,
                                                TransactionId xid, LOCKMODE lockmode);
 extern bool LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc,
                                                   bool allxids, TransactionId xid);
-extern int LockResolveConflicts(LOCKMETHOD lockmethod, LOCKMODE lockmode,
-                                                               LOCK *lock, HOLDER *holder, PROC *proc,
-                                                               int *myHolding);
+extern int LockCheckConflicts(LOCKMETHODTABLE *lockMethodTable,
+                                                         LOCKMODE lockmode,
+                                                         LOCK *lock, HOLDER *holder, PROC *proc,
+                                                         int *myHolding);
 extern void GrantLock(LOCK *lock, HOLDER *holder, LOCKMODE lockmode);
+extern void RemoveFromWaitQueue(PROC *proc);
 extern int     LockShmemSize(int maxBackends);
-extern bool DeadLockCheck(PROC *thisProc, LOCK *findlock);
+extern bool DeadLockCheck(PROC *proc);
+extern void InitDeadLockChecking(void);
 
 #ifdef LOCK_DEBUG
 extern void DumpLocks(void);
index 3f8902e7d373850032b2cddfadab005bb246aae4..4dd5a8c2a676e04930daa3693586923942f9db96 100644 (file)
@@ -7,7 +7,7 @@
  * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $Id: proc.h,v 1.38 2001/01/24 19:43:28 momjian Exp $
+ * $Id: proc.h,v 1.39 2001/01/25 03:31:16 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -41,7 +41,7 @@ struct proc
        SHM_QUEUE       links;                  /* list link if process is in a list */
 
        SEMA            sem;                    /* ONE semaphore to sleep on */
-       int                     errType;                /* error code tells why we woke up */
+       int                     errType;                /* STATUS_OK or STATUS_ERROR after wakeup */
 
        TransactionId xid;                      /* transaction currently being executed by
                                                                 * this proc */
@@ -86,13 +86,6 @@ do { \
        if (MyProc) (MyProc->sLocks[(lock)])--; \
 } while (0)
 
-/*
- * flags explaining why process woke up
- */
-#define NO_ERROR               0
-#define ERR_TIMEOUT            1
-#define ERR_BUFFER_IO  2
-
 
 /*
  * There is one ProcGlobal struct for the whole installation.
@@ -134,10 +127,10 @@ extern void ProcReleaseLocks(bool isCommit);
 extern bool ProcRemove(int pid);
 
 extern void ProcQueueInit(PROC_QUEUE *queue);
-extern int ProcSleep(LOCKMETHODCTL *lockctl, LOCKMODE lockmode,
+extern int ProcSleep(LOCKMETHODTABLE *lockMethodTable, LOCKMODE lockmode,
                                         LOCK *lock, HOLDER *holder);
 extern PROC *ProcWakeup(PROC *proc, int errType);
-extern int ProcLockWakeup(LOCKMETHOD lockmethod, LOCK *lock);
+extern void ProcLockWakeup(LOCKMETHODTABLE *lockMethodTable, LOCK *lock);
 extern void ProcReleaseSpins(PROC *proc);
 extern bool LockWaitCancel(void);
 extern void HandleDeadLock(SIGNAL_ARGS);