to pghackers on 18-Jan-01.
# Makefile for storage/lmgr
#
# IDENTIFICATION
-# $Header: /cvsroot/pgsql/src/backend/storage/lmgr/Makefile,v 1.14 2000/08/31 16:10:36 petere Exp $
+# $Header: /cvsroot/pgsql/src/backend/storage/lmgr/Makefile,v 1.15 2001/01/25 03:31:16 tgl Exp $
#
#-------------------------------------------------------------------------
top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
-OBJS = lmgr.o lock.o proc.o
+OBJS = lmgr.o lock.o proc.o deadlock.o
all: SUBSYS.o
-$Header: /cvsroot/pgsql/src/backend/storage/lmgr/README,v 1.6 2001/01/22 22:30:06 tgl Exp $
+$Header: /cvsroot/pgsql/src/backend/storage/lmgr/README,v 1.7 2001/01/25 03:31:16 tgl Exp $
There are two fundamental lock structures: the per-lockable-object LOCK
struct, and the per-lock-holder HOLDER struct. A LOCK object exists
soft edges outgoing from C starting at the front of the wait queue.
5. The working data structures needed by the deadlock detection code can
-be proven not to need more than MAXBACKENDS entries. Therefore the
-working storage can be statically allocated instead of depending on
-palloc(). This is a good thing, since if the deadlock detector could
-fail for extraneous reasons, all the above safety proofs fall down.
+be limited to numbers of entries computed from MaxBackends. Therefore,
+we can allocate the worst-case space needed during backend startup.
+This seems a safer approach than trying to allocate workspace on the fly;
+we don't want to risk having the deadlock detector run out of memory,
+else we really have no guarantees at all that deadlock will be detected.
--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * deadlock.c
+ * POSTGRES deadlock detection code
+ *
+ * See src/backend/storage/lmgr/README for a description of the deadlock
+ * detection and resolution algorithms.
+ *
+ *
+ * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/deadlock.c,v 1.1 2001/01/25 03:31:16 tgl Exp $
+ *
+ * Interface:
+ *
+ * DeadLockCheck()
+ * InitDeadLockChecking()
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "miscadmin.h"
+#include "storage/proc.h"
+#include "utils/memutils.h"
+
+
+/* One edge in the waits-for graph */
+typedef struct {
+ PROC *waiter; /* the waiting process */
+ PROC *blocker; /* the process it is waiting for */
+ int pred; /* workspace for TopoSort */
+ int link; /* workspace for TopoSort */
+} EDGE;
+
+/* One potential reordering of a lock's wait queue */
+typedef struct {
+ LOCK *lock; /* the lock whose wait queue is described */
+ PROC **procs; /* array of PROC *'s in new wait order */
+ int nProcs;
+} WAIT_ORDER;
+
+
+static bool DeadLockCheckRecurse(PROC *proc);
+static bool TestConfiguration(PROC *startProc);
+static bool FindLockCycle(PROC *checkProc,
+ EDGE *softEdges, int *nSoftEdges);
+static bool FindLockCycleRecurse(PROC *checkProc,
+ EDGE *softEdges, int *nSoftEdges);
+static bool ExpandConstraints(EDGE *constraints, int nConstraints);
+static bool TopoSort(LOCK *lock, EDGE *constraints, int nConstraints,
+ PROC **ordering);
+#ifdef DEBUG_DEADLOCK
+static void PrintLockQueue(LOCK *lock, const char *info);
+#endif
+
+
+/*
+ * Working space for the deadlock detector
+ */
+
+/* Workspace for FindLockCycle */
+static PROC **visitedProcs; /* Array of visited procs */
+static int nVisitedProcs;
+/* Workspace for TopoSort */
+static PROC **topoProcs; /* Array of not-yet-output procs */
+static int *beforeConstraints; /* Counts of remaining before-constraints */
+static int *afterConstraints; /* List head for after-constraints */
+/* Output area for ExpandConstraints */
+static WAIT_ORDER *waitOrders; /* Array of proposed queue rearrangements */
+static int nWaitOrders;
+static PROC **waitOrderProcs; /* Space for waitOrders queue contents */
+/* Current list of constraints being considered */
+static EDGE *curConstraints;
+static int nCurConstraints;
+static int maxCurConstraints;
+/* Storage space for results from FindLockCycle */
+static EDGE *possibleConstraints;
+static int nPossibleConstraints;
+static int maxPossibleConstraints;
+
+
+/*
+ * InitDeadLockChecking -- initialize deadlock checker during backend startup
+ *
+ * This does per-backend initialization of the deadlock checker; primarily,
+ * allocation of working memory for DeadLockCheck. We do this per-backend
+ * since there's no percentage in making the kernel do copy-on-write
+ * inheritance of workspace from the postmaster. We want to allocate the
+ * space at startup because the deadlock checker might be invoked when there's
+ * no free memory left.
+ */
+void
+InitDeadLockChecking(void)
+{
+ MemoryContext oldcxt;
+
+ /* Make sure allocations are permanent */
+ oldcxt = MemoryContextSwitchTo(TopMemoryContext);
+
+ /*
+ * FindLockCycle needs at most MaxBackends entries in visitedProcs[]
+ */
+ visitedProcs = (PROC **) palloc(MaxBackends * sizeof(PROC *));
+
+ /*
+ * TopoSort needs to consider at most MaxBackends wait-queue entries,
+ * and it needn't run concurrently with FindLockCycle.
+ */
+ topoProcs = visitedProcs; /* re-use this space */
+ beforeConstraints = (int *) palloc(MaxBackends * sizeof(int));
+ afterConstraints = (int *) palloc(MaxBackends * sizeof(int));
+
+ /*
+ * We need to consider rearranging at most MaxBackends/2 wait queues
+ * (since it takes at least two waiters in a queue to create a soft edge),
+ * and the expanded form of the wait queues can't involve more than
+ * MaxBackends total waiters.
+ */
+ waitOrders = (WAIT_ORDER *) palloc((MaxBackends/2) * sizeof(WAIT_ORDER));
+ waitOrderProcs = (PROC **) palloc(MaxBackends * sizeof(PROC *));
+
+ /*
+ * Allow at most MaxBackends distinct constraints in a configuration.
+ * (Is this enough? In practice it seems it should be, but I don't quite
+ * see how to prove it. If we run out, we might fail to find a workable
+ * wait queue rearrangement even though one exists.) NOTE that this
+ * number limits the maximum recursion depth of DeadLockCheckRecurse.
+ * Making it really big might potentially allow a stack-overflow problem.
+ */
+ maxCurConstraints = MaxBackends;
+ curConstraints = (EDGE *) palloc(maxCurConstraints * sizeof(EDGE));
+
+ /*
+ * Allow up to 3*MaxBackends constraints to be saved without having to
+ * re-run TestConfiguration. (This is probably more than enough, but
+ * we can survive if we run low on space by doing excess runs of
+ * TestConfiguration to re-compute constraint lists each time needed.)
+ * The last MaxBackends entries in possibleConstraints[] are reserved as
+ * output workspace for FindLockCycle.
+ */
+ maxPossibleConstraints = MaxBackends * 4;
+ possibleConstraints =
+ (EDGE *) palloc(maxPossibleConstraints * sizeof(EDGE));
+
+ MemoryContextSwitchTo(oldcxt);
+}
+
+/*
+ * DeadLockCheck -- Checks for deadlocks for a given process
+ *
+ * This code looks for deadlocks involving the given process. If any
+ * are found, it tries to rearrange lock wait queues to resolve the
+ * deadlock. If resolution is impossible, return TRUE --- the caller
+ * is then expected to abort the given proc's transaction.
+ *
+ * We can't block on user locks, so no sense testing for deadlock
+ * because there is no blocking, and no timer for the block. So,
+ * only look at regular locks.
+ *
+ * We must have already locked the master lock before being called.
+ * NOTE: although the lockctl structure appears to allow each lock
+ * table to have a different spinlock, all locks that can block had
+ * better use the same spinlock, else this code will not be adequately
+ * interlocked!
+ */
+bool
+DeadLockCheck(PROC *proc)
+{
+ int i,
+ j;
+
+ /* Initialize to "no constraints" */
+ nCurConstraints = 0;
+ nPossibleConstraints = 0;
+ nWaitOrders = 0;
+
+ /* Search for deadlocks and possible fixes */
+ if (DeadLockCheckRecurse(proc))
+ return true; /* cannot find a non-deadlocked state */
+
+ /* Apply any needed rearrangements of wait queues */
+ for (i = 0; i < nWaitOrders; i++)
+ {
+ LOCK *lock = waitOrders[i].lock;
+ PROC **procs = waitOrders[i].procs;
+ int nProcs = waitOrders[i].nProcs;
+ PROC_QUEUE *waitQueue = &(lock->waitProcs);
+
+ Assert(nProcs == waitQueue->size);
+
+#ifdef DEBUG_DEADLOCK
+ PrintLockQueue(lock, "DeadLockCheck:");
+#endif
+
+ /* Reset the queue and re-add procs in the desired order */
+ ProcQueueInit(waitQueue);
+ for (j = 0; j < nProcs; j++)
+ {
+ SHMQueueInsertBefore(&(waitQueue->links), &(procs[j]->links));
+ waitQueue->size++;
+ }
+
+#ifdef DEBUG_DEADLOCK
+ PrintLockQueue(lock, "rearranged to:");
+#endif
+ }
+ return false;
+}
+
+/*
+ * DeadLockCheckRecurse -- recursively search for valid orderings
+ *
+ * curConstraints[] holds the current set of constraints being considered
+ * by an outer level of recursion. Add to this each possible solution
+ * constraint for any cycle detected at this level.
+ *
+ * Returns TRUE if no solution exists. Returns FALSE if a deadlock-free
+ * state is attainable, in which case waitOrders[] shows the required
+ * rearrangements of lock wait queues (if any).
+ */
+static bool
+DeadLockCheckRecurse(PROC *proc)
+{
+ int nEdges;
+ int oldPossibleConstraints;
+ bool savedList;
+ int i;
+
+ nEdges = TestConfiguration(proc);
+ if (nEdges < 0)
+ return true; /* hard deadlock --- no solution */
+ if (nEdges == 0)
+ return false; /* good configuration found */
+ if (nCurConstraints >= maxCurConstraints)
+ return true; /* out of room for active constraints? */
+ oldPossibleConstraints = nPossibleConstraints;
+ if (nPossibleConstraints + nEdges + MaxBackends <= maxPossibleConstraints)
+ {
+ /* We can save the edge list in possibleConstraints[] */
+ nPossibleConstraints += nEdges;
+ savedList = true;
+ }
+ else
+ {
+ /* Not room; will need to regenerate the edges on-the-fly */
+ savedList = false;
+ }
+ /*
+ * Try each available soft edge as an addition to the configuration.
+ */
+ for (i = 0; i < nEdges; i++)
+ {
+ if (!savedList && i > 0)
+ {
+ /* Regenerate the list of possible added constraints */
+ if (nEdges != TestConfiguration(proc))
+ elog(FATAL, "DeadLockCheckRecurse: inconsistent results");
+ }
+ curConstraints[nCurConstraints] =
+ possibleConstraints[oldPossibleConstraints+i];
+ nCurConstraints++;
+ if (!DeadLockCheckRecurse(proc))
+ return false; /* found a valid solution! */
+ /* give up on that added constraint, try again */
+ nCurConstraints--;
+ }
+ nPossibleConstraints = oldPossibleConstraints;
+ return true; /* no solution found */
+}
+
+
+/*--------------------
+ * Test a configuration (current set of constraints) for validity.
+ *
+ * Returns:
+ * 0: the configuration is good (no deadlocks)
+ * -1: the configuration has a hard deadlock or is not self-consistent
+ * >0: the configuration has one or more soft deadlocks
+ *
+ * In the soft-deadlock case, one of the soft cycles is chosen arbitrarily
+ * and a list of its soft edges is returned beginning at
+ * possibleConstraints+nPossibleConstraints. The return value is the
+ * number of soft edges.
+ *--------------------
+ */
+static bool
+TestConfiguration(PROC *startProc)
+{
+ int softFound = 0;
+ EDGE *softEdges = possibleConstraints + nPossibleConstraints;
+ int nSoftEdges;
+ int i;
+
+ /*
+ * Make sure we have room for FindLockCycle's output.
+ */
+ if (nPossibleConstraints + MaxBackends > maxPossibleConstraints)
+ return -1;
+ /*
+ * Expand current constraint set into wait orderings. Fail if the
+ * constraint set is not self-consistent.
+ */
+ if (!ExpandConstraints(curConstraints, nCurConstraints))
+ return -1;
+ /*
+ * Check for cycles involving startProc or any of the procs mentioned
+ * in constraints. We check startProc last because if it has a soft
+ * cycle still to be dealt with, we want to deal with that first.
+ */
+ for (i = 0; i < nCurConstraints; i++)
+ {
+ if (FindLockCycle(curConstraints[i].waiter, softEdges, &nSoftEdges))
+ {
+ if (nSoftEdges == 0)
+ return -1; /* hard deadlock detected */
+ softFound = nSoftEdges;
+ }
+ if (FindLockCycle(curConstraints[i].blocker, softEdges, &nSoftEdges))
+ {
+ if (nSoftEdges == 0)
+ return -1; /* hard deadlock detected */
+ softFound = nSoftEdges;
+ }
+ }
+ if (FindLockCycle(startProc, softEdges, &nSoftEdges))
+ {
+ if (nSoftEdges == 0)
+ return -1; /* hard deadlock detected */
+ softFound = nSoftEdges;
+ }
+ return softFound;
+}
+
+
+/*
+ * FindLockCycle -- basic check for deadlock cycles
+ *
+ * Scan outward from the given proc to see if there is a cycle in the
+ * waits-for graph that includes this proc. Return TRUE if a cycle
+ * is found, else FALSE. If a cycle is found, we also return a list of
+ * the "soft edges", if any, included in the cycle. These edges could
+ * potentially be eliminated by rearranging wait queues.
+ *
+ * Since we need to be able to check hypothetical configurations that would
+ * exist after wait queue rearrangement, the routine pays attention to the
+ * table of hypothetical queue orders in waitOrders[]. These orders will
+ * be believed in preference to the actual ordering seen in the locktable.
+ */
+static bool
+FindLockCycle(PROC *checkProc,
+ EDGE *softEdges, /* output argument */
+ int *nSoftEdges) /* output argument */
+{
+ nVisitedProcs = 0;
+ *nSoftEdges = 0;
+ return FindLockCycleRecurse(checkProc, softEdges, nSoftEdges);
+}
+
+static bool
+FindLockCycleRecurse(PROC *checkProc,
+ EDGE *softEdges, /* output argument */
+ int *nSoftEdges) /* output argument */
+{
+ PROC *proc;
+ LOCK *lock;
+ HOLDER *holder;
+ SHM_QUEUE *lockHolders;
+ LOCKMETHODTABLE *lockMethodTable;
+ LOCKMETHODCTL *lockctl;
+ PROC_QUEUE *waitQueue;
+ int queue_size;
+ int conflictMask;
+ int i;
+ int numLockModes,
+ lm;
+
+ /*
+ * Have we already seen this proc?
+ */
+ for (i = 0; i < nVisitedProcs; i++)
+ {
+ if (visitedProcs[i] == checkProc)
+ {
+ /* If we return to starting point, we have a deadlock cycle */
+ if (i == 0)
+ return true;
+ /*
+ * Otherwise, we have a cycle but it does not include the start
+ * point, so say "no deadlock".
+ */
+ return false;
+ }
+ }
+ /* Mark proc as seen */
+ Assert(nVisitedProcs < MaxBackends);
+ visitedProcs[nVisitedProcs++] = checkProc;
+ /*
+ * If the proc is not waiting, we have no outgoing waits-for edges.
+ */
+ if (checkProc->links.next == INVALID_OFFSET)
+ return false;
+ lock = checkProc->waitLock;
+ if (lock == NULL)
+ return false;
+ lockMethodTable = GetLocksMethodTable(lock);
+ lockctl = lockMethodTable->ctl;
+ numLockModes = lockctl->numLockModes;
+ conflictMask = lockctl->conflictTab[checkProc->waitLockMode];
+ /*
+ * Scan for procs that already hold conflicting locks. These are
+ * "hard" edges in the waits-for graph.
+ */
+ lockHolders = &(lock->lockHolders);
+
+ holder = (HOLDER *) SHMQueueNext(lockHolders, lockHolders,
+ offsetof(HOLDER, lockLink));
+
+ while (holder)
+ {
+ proc = (PROC *) MAKE_PTR(holder->tag.proc);
+
+ /* A proc never blocks itself */
+ if (proc != checkProc)
+ {
+ for (lm = 1; lm <= numLockModes; lm++)
+ {
+ if (holder->holding[lm] > 0 &&
+ ((1 << lm) & conflictMask) != 0)
+ {
+ /* This proc hard-blocks checkProc */
+ if (FindLockCycleRecurse(proc, softEdges, nSoftEdges))
+ return true;
+ /* If no deadlock, we're done looking at this holder */
+ break;
+ }
+ }
+ }
+
+ holder = (HOLDER *) SHMQueueNext(lockHolders, &holder->lockLink,
+ offsetof(HOLDER, lockLink));
+ }
+
+ /*
+ * Scan for procs that are ahead of this one in the lock's wait queue.
+ * Those that have conflicting requests soft-block this one. This must
+ * be done after the hard-block search, since if another proc both
+ * hard- and soft-blocks this one, we want to call it a hard edge.
+ *
+ * If there is a proposed re-ordering of the lock's wait order,
+ * use that rather than the current wait order.
+ */
+ for (i = 0; i < nWaitOrders; i++)
+ {
+ if (waitOrders[i].lock == lock)
+ break;
+ }
+
+ if (i < nWaitOrders)
+ {
+ /* Use the given hypothetical wait queue order */
+ PROC **procs = waitOrders[i].procs;
+
+ queue_size = waitOrders[i].nProcs;
+
+ for (i = 0; i < queue_size; i++)
+ {
+ proc = procs[i];
+
+ /* Done when we reach the target proc */
+ if (proc == checkProc)
+ break;
+
+ /* Is there a conflict with this guy's request? */
+ if (((1 << proc->waitLockMode) & conflictMask) != 0)
+ {
+ /* This proc soft-blocks checkProc */
+ if (FindLockCycleRecurse(proc, softEdges, nSoftEdges))
+ {
+ /* Add this edge to the list of soft edges in the cycle */
+ Assert(*nSoftEdges < MaxBackends);
+ softEdges[*nSoftEdges].waiter = checkProc;
+ softEdges[*nSoftEdges].blocker = proc;
+ (*nSoftEdges)++;
+ return true;
+ }
+ }
+ }
+ }
+ else
+ {
+ /* Use the true lock wait queue order */
+ waitQueue = &(lock->waitProcs);
+ queue_size = waitQueue->size;
+
+ proc = (PROC *) MAKE_PTR(waitQueue->links.next);
+
+ while (queue_size-- > 0)
+ {
+ /* Done when we reach the target proc */
+ if (proc == checkProc)
+ break;
+
+ /* Is there a conflict with this guy's request? */
+ if (((1 << proc->waitLockMode) & conflictMask) != 0)
+ {
+ /* This proc soft-blocks checkProc */
+ if (FindLockCycleRecurse(proc, softEdges, nSoftEdges))
+ {
+ /* Add this edge to the list of soft edges in the cycle */
+ Assert(*nSoftEdges < MaxBackends);
+ softEdges[*nSoftEdges].waiter = checkProc;
+ softEdges[*nSoftEdges].blocker = proc;
+ (*nSoftEdges)++;
+ return true;
+ }
+ }
+
+ proc = (PROC *) MAKE_PTR(proc->links.next);
+ }
+ }
+
+ /*
+ * No conflict detected here.
+ */
+ return false;
+}
+
+
+/*
+ * ExpandConstraints -- expand a list of constraints into a set of
+ * specific new orderings for affected wait queues
+ *
+ * Input is a list of soft edges to be reversed. The output is a list
+ * of nWaitOrders WAIT_ORDER structs in waitOrders[], with PROC array
+ * workspace in waitOrderProcs[].
+ *
+ * Returns TRUE if able to build an ordering that satisfies all the
+ * constraints, FALSE if not (there are contradictory constraints).
+ */
+static bool
+ExpandConstraints(EDGE *constraints,
+ int nConstraints)
+{
+ int nWaitOrderProcs = 0;
+ int i,
+ j;
+
+ nWaitOrders = 0;
+ /*
+ * Scan constraint list backwards. This is because the last-added
+ * constraint is the only one that could fail, and so we want to test
+ * it for inconsistency first.
+ */
+ for (i = nConstraints; --i >= 0; )
+ {
+ PROC *proc = constraints[i].waiter;
+ LOCK *lock = proc->waitLock;
+
+ /* Did we already make a list for this lock? */
+ for (j = nWaitOrders; --j >= 0; )
+ {
+ if (waitOrders[j].lock == lock)
+ break;
+ }
+ if (j >= 0)
+ continue;
+ /* No, so allocate a new list */
+ waitOrders[nWaitOrders].lock = lock;
+ waitOrders[nWaitOrders].procs = waitOrderProcs + nWaitOrderProcs;
+ waitOrders[nWaitOrders].nProcs = lock->waitProcs.size;
+ nWaitOrderProcs += lock->waitProcs.size;
+ Assert(nWaitOrderProcs <= MaxBackends);
+ /*
+ * Do the topo sort. TopoSort need not examine constraints after
+ * this one, since they must be for different locks.
+ */
+ if (!TopoSort(lock, constraints, i+1,
+ waitOrders[nWaitOrders].procs))
+ return false;
+ nWaitOrders++;
+ }
+ return true;
+}
+
+
+/*
+ * TopoSort -- topological sort of a wait queue
+ *
+ * Generate a re-ordering of a lock's wait queue that satisfies given
+ * constraints about certain procs preceding others. (Each such constraint
+ * is a fact of a partial ordering.) Minimize rearrangement of the queue
+ * not needed to achieve the partial ordering.
+ *
+ * This is a lot simpler and slower than, for example, the topological sort
+ * algorithm shown in Knuth's Volume 1. However, Knuth's method doesn't
+ * try to minimize the damage to the existing order. In practice we are
+ * not likely to be working with more than a few constraints, so the apparent
+ * slowness of the algorithm won't really matter.
+ *
+ * The initial queue ordering is taken directly from the lock's wait queue.
+ * The output is an array of PROC pointers, of length equal to the lock's
+ * wait queue length (the caller is responsible for providing this space).
+ * The partial order is specified by an array of EDGE structs. Each EDGE
+ * is one that we need to reverse, therefore the "waiter" must appear before
+ * the "blocker" in the output array. The EDGE array may well contain
+ * edges associated with other locks; these should be ignored.
+ *
+ * Returns TRUE if able to build an ordering that satisfies all the
+ * constraints, FALSE if not (there are contradictory constraints).
+ */
+static bool
+TopoSort(LOCK *lock,
+ EDGE *constraints,
+ int nConstraints,
+ PROC **ordering) /* output argument */
+{
+ PROC_QUEUE *waitQueue = &(lock->waitProcs);
+ int queue_size = waitQueue->size;
+ PROC *proc;
+ int i,
+ j,
+ k,
+ last;
+
+ /* First, fill topoProcs[] array with the procs in their current order */
+ proc = (PROC *) MAKE_PTR(waitQueue->links.next);
+ for (i = 0; i < queue_size; i++)
+ {
+ topoProcs[i] = proc;
+ proc = (PROC *) MAKE_PTR(proc->links.next);
+ }
+
+ /*
+ * Scan the constraints, and for each proc in the array, generate a count
+ * of the number of constraints that say it must be before something else,
+ * plus a list of the constraints that say it must be after something else.
+ * The count for the j'th proc is stored in beforeConstraints[j], and the
+ * head of its list in afterConstraints[j]. Each constraint stores its
+ * list link in constraints[i].link (note any constraint will be in
+ * just one list). The array index for the before-proc of the i'th
+ * constraint is remembered in constraints[i].pred.
+ */
+ MemSet(beforeConstraints, 0, queue_size * sizeof(int));
+ MemSet(afterConstraints, 0, queue_size * sizeof(int));
+ for (i = 0; i < nConstraints; i++)
+ {
+ proc = constraints[i].waiter;
+ /* Ignore constraint if not for this lock */
+ if (proc->waitLock != lock)
+ continue;
+ /* Find the waiter proc in the array */
+ for (j = queue_size; --j >= 0; )
+ {
+ if (topoProcs[j] == proc)
+ break;
+ }
+ Assert(j >= 0); /* should have found a match */
+ /* Find the blocker proc in the array */
+ proc = constraints[i].blocker;
+ for (k = queue_size; --k >= 0; )
+ {
+ if (topoProcs[k] == proc)
+ break;
+ }
+ Assert(k >= 0); /* should have found a match */
+ beforeConstraints[j]++; /* waiter must come before */
+ /* add this constraint to list of after-constraints for blocker */
+ constraints[i].pred = j;
+ constraints[i].link = afterConstraints[k];
+ afterConstraints[k] = i+1;
+ }
+ /*--------------------
+ * Now scan the topoProcs array backwards. At each step, output the
+ * last proc that has no remaining before-constraints, and decrease
+ * the beforeConstraints count of each of the procs it was constrained
+ * against.
+ * i = index of ordering[] entry we want to output this time
+ * j = search index for topoProcs[]
+ * k = temp for scanning constraint list for proc j
+ * last = last non-null index in topoProcs (avoid redundant searches)
+ *--------------------
+ */
+ last = queue_size-1;
+ for (i = queue_size; --i >= 0; )
+ {
+ /* Find next candidate to output */
+ while (topoProcs[last] == NULL)
+ last--;
+ for (j = last; j >= 0; j--)
+ {
+ if (topoProcs[j] != NULL && beforeConstraints[j] == 0)
+ break;
+ }
+ /* If no available candidate, topological sort fails */
+ if (j < 0)
+ return false;
+ /* Output candidate, and mark it done by zeroing topoProcs[] entry */
+ ordering[i] = topoProcs[j];
+ topoProcs[j] = NULL;
+ /* Update beforeConstraints counts of its predecessors */
+ for (k = afterConstraints[j]; k > 0; k = constraints[k-1].link)
+ {
+ beforeConstraints[constraints[k-1].pred]--;
+ }
+ }
+
+ /* Done */
+ return true;
+}
+
+#ifdef DEBUG_DEADLOCK
+static void
+PrintLockQueue(LOCK *lock, const char *info)
+{
+ PROC_QUEUE *waitQueue = &(lock->waitProcs);
+ int queue_size = waitQueue->size;
+ PROC *proc;
+ int i;
+
+ printf("%s lock %lx queue ", info, MAKE_OFFSET(lock));
+ proc = (PROC *) MAKE_PTR(waitQueue->links.next);
+ for (i = 0; i < queue_size; i++)
+ {
+ printf(" %d", proc->pid);
+ proc = (PROC *) MAKE_PTR(proc->links.next);
+ }
+ printf("\n");
+ fflush(stdout);
+}
+#endif
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.80 2001/01/24 19:43:08 momjian Exp $
+ * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.81 2001/01/25 03:31:16 tgl Exp $
*
* NOTES
* Outside modules can create a lock table and acquire/release
*
* LockAcquire(), LockRelease(), LockMethodTableInit(),
* LockMethodTableRename(), LockReleaseAll,
- * LockResolveConflicts(), GrantLock()
+ * LockCheckConflicts(), GrantLock()
*
*-------------------------------------------------------------------------
*/
+#include "postgres.h"
+
#include <sys/types.h>
#include <unistd.h>
#include <signal.h>
-#include "postgres.h"
-
#include "access/xact.h"
#include "miscadmin.h"
#include "storage/proc.h"
LOCK *lock, HOLDER *holder);
static void LockCountMyLocks(SHMEM_OFFSET lockOffset, PROC *proc,
int *myHolding);
-static int LockGetMyHeldLocks(SHMEM_OFFSET lockOffset, PROC *proc);
static char *lock_types[] =
{
return LockingIsDisabled;
}
+/*
+ * Fetch the lock method table associated with a given lock
+ */
+LOCKMETHODTABLE *
+GetLocksMethodTable(LOCK *lock)
+{
+ LOCKMETHOD lockmethod = LOCK_LOCKMETHOD(*lock);
+
+ Assert(lockmethod > 0 && lockmethod < NumLockMethods);
+ return LockMethodTable[lockmethod];
+}
+
/*
* LockMethodInit -- initialize the lock table's lock type
if (!holder)
{
SpinRelease(masterLock);
- elog(NOTICE, "LockAcquire: holder table corrupted");
+ elog(FATAL, "LockAcquire: holder table corrupted");
return FALSE;
}
Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
/* --------------------
- * If I'm the only one holding any lock on this object, then there
- * cannot be a conflict. The same is true if I already hold this lock.
+ * If I already hold one or more locks of the requested type,
+ * just grant myself another one without blocking.
* --------------------
*/
- if (holder->nHolding == lock->nGranted || holder->holding[lockmode] != 0)
+ if (holder->holding[lockmode] > 0)
{
GrantLock(lock, holder, lockmode);
HOLDER_PRINT("LockAcquire: owning", holder);
/* --------------------
* If this process (under any XID) is a holder of the lock,
- * then there is no conflict, either.
+ * also grant myself another one without blocking.
* --------------------
*/
LockCountMyLocks(holder->tag.lock, MyProc, myHolding);
- if (myHolding[lockmode] != 0)
+ if (myHolding[lockmode] > 0)
{
GrantLock(lock, holder, lockmode);
HOLDER_PRINT("LockAcquire: my other XID owning", holder);
return TRUE;
}
- /*
- * If lock requested conflicts with locks requested by waiters...
+ /* --------------------
+ * If lock requested conflicts with locks requested by waiters,
+ * must join wait queue. Otherwise, check for conflict with
+ * already-held locks. (That's last because most complex check.)
+ * --------------------
*/
if (lockMethodTable->ctl->conflictTab[lockmode] & lock->waitMask)
- {
- /*
- * If my process doesn't hold any locks that conflict with waiters
- * then force to sleep, so that prior waiters get first chance.
- */
- for (i = 1; i <= lockMethodTable->ctl->numLockModes; i++)
- {
- if (myHolding[i] > 0 &&
- lockMethodTable->ctl->conflictTab[i] & lock->waitMask)
- break; /* yes, there is a conflict */
- }
-
- if (i > lockMethodTable->ctl->numLockModes)
- {
- HOLDER_PRINT("LockAcquire: another proc already waiting",
- holder);
- status = STATUS_FOUND;
- }
- else
- status = LockResolveConflicts(lockmethod, lockmode,
- lock, holder,
- MyProc, myHolding);
- }
+ status = STATUS_FOUND;
else
- status = LockResolveConflicts(lockmethod, lockmode,
- lock, holder,
- MyProc, myHolding);
+ status = LockCheckConflicts(lockMethodTable, lockmode,
+ lock, holder,
+ MyProc, myHolding);
if (status == STATUS_OK)
+ {
+ /* No conflict with held or previously requested locks */
GrantLock(lock, holder, lockmode);
- else if (status == STATUS_FOUND)
+ }
+ else
{
+ Assert(status == STATUS_FOUND);
#ifdef USER_LOCKS
/*
}
/* ----------------------------
- * LockResolveConflicts -- test for lock conflicts
+ * LockCheckConflicts -- test whether requested lock conflicts
+ * with those already granted
+ *
+ * Returns STATUS_FOUND if conflict, STATUS_OK if no conflict.
*
* NOTES:
- * Here's what makes this complicated: one transaction's
- * locks don't conflict with one another. When many processes
- * hold locks, each has to subtract off the other's locks when
- * determining whether or not any new lock acquired conflicts with
- * the old ones.
+ * Here's what makes this complicated: one process's locks don't
+ * conflict with one another, even if they are held under different
+ * transaction IDs (eg, session and xact locks do not conflict).
+ * So, we must subtract off our own locks when determining whether the
+ * requested new lock conflicts with those already held.
*
* The caller can optionally pass the process's total holding counts, if
* known. If NULL is passed then these values will be computed internally.
* ----------------------------
*/
int
-LockResolveConflicts(LOCKMETHOD lockmethod,
- LOCKMODE lockmode,
- LOCK *lock,
- HOLDER *holder,
- PROC *proc,
- int *myHolding) /* myHolding[] array or NULL */
+LockCheckConflicts(LOCKMETHODTABLE *lockMethodTable,
+ LOCKMODE lockmode,
+ LOCK *lock,
+ HOLDER *holder,
+ PROC *proc,
+ int *myHolding) /* myHolding[] array or NULL */
{
- LOCKMETHODCTL *lockctl = LockMethodTable[lockmethod]->ctl;
+ LOCKMETHODCTL *lockctl = lockMethodTable->ctl;
int numLockModes = lockctl->numLockModes;
int bitmask;
int i,
tmpMask;
int localHolding[MAX_LOCKMODES];
- Assert((holder->nHolding >= 0) && (holder->holding[lockmode] >= 0));
-
/* ----------------------------
* first check for global conflicts: If no locks conflict
- * with mine, then I get the lock.
+ * with my request, then I get the lock.
*
* Checking for conflict: lock->grantMask represents the types of
* currently held locks. conflictTable[lockmode] has a bit
- * set for each type of lock that conflicts with mine. Bitwise
+ * set for each type of lock that conflicts with request. Bitwise
* compare tells if there is a conflict.
* ----------------------------
*/
if (!(lockctl->conflictTab[lockmode] & lock->grantMask))
{
- HOLDER_PRINT("LockResolveConflicts: no conflict", holder);
+ HOLDER_PRINT("LockCheckConflicts: no conflict", holder);
return STATUS_OK;
}
if (!(lockctl->conflictTab[lockmode] & bitmask))
{
/* no conflict. OK to get the lock */
- HOLDER_PRINT("LockResolveConflicts: resolved", holder);
+ HOLDER_PRINT("LockCheckConflicts: resolved", holder);
return STATUS_OK;
}
- HOLDER_PRINT("LockResolveConflicts: conflicting", holder);
+ HOLDER_PRINT("LockCheckConflicts: conflicting", holder);
return STATUS_FOUND;
}
}
}
-/*
- * LockGetMyHeldLocks -- compute bitmask of lock types held by a process
- * for a given lockable object.
- */
-static int
-LockGetMyHeldLocks(SHMEM_OFFSET lockOffset, PROC *proc)
-{
- int myHolding[MAX_LOCKMODES];
- int heldLocks = 0;
- int i,
- tmpMask;
-
- LockCountMyLocks(lockOffset, proc, myHolding);
-
- for (i = 1, tmpMask = 2;
- i < MAX_LOCKMODES;
- i++, tmpMask <<= 1)
- {
- if (myHolding[i] > 0)
- heldLocks |= tmpMask;
- }
- return heldLocks;
-}
-
/*
* GrantLock -- update the lock and holder data structures to show
* the lock request has been granted.
+ *
+ * NOTE: if proc was blocked, it also needs to be removed from the wait list
+ * and have its waitLock/waitHolder fields cleared. That's not done here.
*/
void
GrantLock(LOCK *lock, HOLDER *holder, LOCKMODE lockmode)
/*
* WaitOnLock -- wait to acquire a lock
*
+ * Caller must have set MyProc->heldLocks to reflect locks already held
+ * on the lockable object by this process (under all XIDs).
+ *
* The locktable spinlock must be held at entry.
*/
static int
strcat(new_status, " waiting");
set_ps_display(new_status);
- /*
+ /* -------------------
* NOTE: Think not to put any lock state cleanup after the call to
* ProcSleep, in either the normal or failure path. The lock state
* must be fully set by the lock grantor, or by HandleDeadLock if we
* after someone else grants us the lock, but before we've noticed it.
* Hence, after granting, the locktable state must fully reflect the
* fact that we own the lock; we can't do additional work on return.
+ * -------------------
*/
- if (ProcSleep(lockMethodTable->ctl,
+ if (ProcSleep(lockMethodTable,
lockmode,
lock,
- holder) != NO_ERROR)
+ holder) != STATUS_OK)
{
/* -------------------
* We failed as a result of a deadlock, see HandleDeadLock().
return STATUS_OK;
}
+/*--------------------
+ * Remove a proc from the wait-queue it is on
+ * (caller must know it is on one).
+ *
+ * Locktable lock must be held by caller.
+ *
+ * NB: this does not remove the process' holder object, nor the lock object,
+ * even though their counts might now have gone to zero. That will happen
+ * during a subsequent LockReleaseAll call, which we expect will happen
+ * during transaction cleanup. (Removal of a proc from its wait queue by
+ * this routine can only happen if we are aborting the transaction.)
+ *--------------------
+ */
+void
+RemoveFromWaitQueue(PROC *proc)
+{
+ LOCK *waitLock = proc->waitLock;
+ LOCKMODE lockmode = proc->waitLockMode;
+
+ /* Make sure proc is waiting */
+ Assert(proc->links.next != INVALID_OFFSET);
+ Assert(waitLock);
+ Assert(waitLock->waitProcs.size > 0);
+
+ /* Remove proc from lock's wait queue */
+ SHMQueueDelete(&(proc->links));
+ waitLock->waitProcs.size--;
+
+ /* Undo increments of request counts by waiting process */
+ Assert(waitLock->nRequested > 0);
+ Assert(waitLock->nRequested > proc->waitLock->nGranted);
+ waitLock->nRequested--;
+ Assert(waitLock->requested[lockmode] > 0);
+ waitLock->requested[lockmode]--;
+ /* don't forget to clear waitMask bit if appropriate */
+ if (waitLock->granted[lockmode] == waitLock->requested[lockmode])
+ waitLock->waitMask &= BITS_OFF[lockmode];
+
+ /* Clean up the proc's own state */
+ proc->waitLock = NULL;
+ proc->waitHolder = NULL;
+
+ /* See if any other waiters for the lock can be woken up now */
+ ProcLockWakeup(GetLocksMethodTable(waitLock), waitLock);
+}
+
/*
* LockRelease -- look up 'locktag' in lock table 'lockmethod' and
- * release it.
+ * release one 'lockmode' lock on it.
*
- * Side Effects: if the lock no longer conflicts with the highest
- * priority waiting process, that process is granted the lock
- * and awoken. (We have to grant the lock here to avoid a
- * race between the waking process and any new process to
+ * Side Effects: find any waiting processes that are now wakable,
+ * grant them their requested locks and awaken them.
+ * (We have to grant the lock here to avoid a race between
+ * the waking process and any new process to
* come along and request the lock.)
*/
bool
HOLDER *holder;
HOLDERTAG holdertag;
HTAB *holderTable;
- bool wakeupNeeded = true;
+ bool wakeupNeeded = false;
#ifdef LOCK_DEBUG
if (lockmethod == USER_LOCKMETHOD && Trace_userlocks)
return FALSE;
}
HOLDER_PRINT("LockRelease: found", holder);
- Assert(holder->tag.lock == MAKE_OFFSET(lock));
/*
* Check that we are actually holding a lock of the type we want to
*/
if (!(holder->holding[lockmode] > 0))
{
- SpinRelease(masterLock);
HOLDER_PRINT("LockRelease: WRONGTYPE", holder);
+ Assert(holder->holding[lockmode] >= 0);
+ SpinRelease(masterLock);
elog(NOTICE, "LockRelease: you don't own a lock of type %s",
lock_types[lockmode]);
- Assert(holder->holding[lockmode] >= 0);
return FALSE;
}
Assert(holder->nHolding > 0);
lock->grantMask &= BITS_OFF[lockmode];
}
-#ifdef NOT_USED
+ LOCK_PRINT("LockRelease: updated", lock, lockmode);
+ Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
+ Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
+ Assert(lock->nGranted <= lock->nRequested);
+
/* --------------------------
- * If there are still active locks of the type I just released, no one
- * should be woken up. Whoever is asleep will still conflict
- * with the remaining locks.
+ * We need only run ProcLockWakeup if the released lock conflicts with
+ * at least one of the lock types requested by waiter(s). Otherwise
+ * whatever conflict made them wait must still exist. NOTE: before MVCC,
+ * we could skip wakeup if lock->granted[lockmode] was still positive.
+ * But that's not true anymore, because the remaining granted locks might
+ * belong to some waiter, who could now be awakened because he doesn't
+ * conflict with his own locks.
* --------------------------
*/
- if (lock->granted[lockmode])
- wakeupNeeded = false;
- else
-#endif
-
- /*
- * Above is not valid any more (due to MVCC lock modes). Actually
- * we should compare granted[lockmode] with number of
- * waiters holding lock of this type and try to wakeup only if
- * these numbers are equal (and lock released conflicts with locks
- * requested by waiters). For the moment we only check the last
- * condition.
- */
if (lockMethodTable->ctl->conflictTab[lockmode] & lock->waitMask)
wakeupNeeded = true;
- LOCK_PRINT("LockRelease: updated", lock, lockmode);
- Assert((lock->nRequested >= 0) && (lock->requested[lockmode] >= 0));
- Assert((lock->nGranted >= 0) && (lock->granted[lockmode] >= 0));
- Assert(lock->nGranted <= lock->nRequested);
-
if (lock->nRequested == 0)
{
/* ------------------
(Pointer) &(lock->tag),
HASH_REMOVE,
&found);
- Assert(lock && found);
- wakeupNeeded = false;
+ if (!lock || !found)
+ {
+ SpinRelease(masterLock);
+ elog(NOTICE, "LockRelease: remove lock, table corrupted");
+ return FALSE;
+ }
+ wakeupNeeded = false; /* should be false, but make sure */
}
/*
}
}
+ /*
+ * Wake up waiters if needed.
+ */
if (wakeupNeeded)
- ProcLockWakeup(lockmethod, lock);
-#ifdef LOCK_DEBUG
- else if (LOCK_DEBUG_ENABLED(lock))
- elog(DEBUG, "LockRelease: no wakeup needed");
-#endif
+ ProcLockWakeup(lockMethodTable, lock);
SpinRelease(masterLock);
return TRUE;
else
{
/* --------------
- * set nRequested to zero so that we can garbage collect the lock
- * down below...
+ * This holder accounts for all the requested locks on the object,
+ * so we can be lazy and just zero things out.
* --------------
*/
lock->nRequested = 0;
return FALSE;
}
- if (!lock->nRequested)
+ if (lock->nRequested == 0)
{
/* --------------------
* We've just released the last lock, so garbage-collect the
lock = (LOCK *) hash_search(lockMethodTable->lockHash,
(Pointer) &(lock->tag),
HASH_REMOVE, &found);
- if ((!lock) || (!found))
+ if (!lock || !found)
{
SpinRelease(masterLock);
elog(NOTICE, "LockReleaseAll: cannot remove lock from HTAB");
}
}
else if (wakeupNeeded)
- ProcLockWakeup(lockmethod, lock);
+ ProcLockWakeup(lockMethodTable, lock);
next_item:
holder = nextHolder;
return size;
}
-/*
- * DeadLockCheck -- Checks for deadlocks for a given process
- *
- * This code takes a list of locks a process holds, and the lock that
- * the process is sleeping on, and tries to find if any of the processes
- * waiting on its locks hold the lock it is waiting for. If no deadlock
- * is found, it goes on to look at all the processes waiting on their locks.
- *
- * We can't block on user locks, so no sense testing for deadlock
- * because there is no blocking, and no timer for the block. So,
- * only look at regular locks.
- *
- * We have already locked the master lock before being called.
- */
-bool
-DeadLockCheck(PROC *thisProc, LOCK *findlock)
-{
- PROC *waitProc;
- PROC_QUEUE *waitQueue;
- SHM_QUEUE *procHolders = &(thisProc->procHolders);
- HOLDER *holder;
- HOLDER *nextHolder;
- LOCKMETHODCTL *lockctl = LockMethodTable[DEFAULT_LOCKMETHOD]->ctl;
- LOCK *lock;
- int i,
- j;
- bool first_run = (thisProc == MyProc);
-
- static PROC *checked_procs[MAXBACKENDS];
- static int nprocs;
-
- /* initialize at start of recursion */
- if (first_run)
- {
- checked_procs[0] = thisProc;
- nprocs = 1;
- }
-
- /*
- * Scan over all the locks held/awaited by thisProc.
- */
- holder = (HOLDER *) SHMQueueNext(procHolders, procHolders,
- offsetof(HOLDER, procLink));
-
- while (holder)
- {
- /* Get link first, since we may unlink/delete this holder */
- nextHolder = (HOLDER *) SHMQueueNext(procHolders, &holder->procLink,
- offsetof(HOLDER, procLink));
-
- Assert(holder->tag.proc == MAKE_OFFSET(thisProc));
-
- lock = (LOCK *) MAKE_PTR(holder->tag.lock);
-
- /* Ignore user locks */
- if (lock->tag.lockmethod != DEFAULT_LOCKMETHOD)
- goto nxtl;
-
- HOLDER_PRINT("DeadLockCheck", holder);
- LOCK_PRINT("DeadLockCheck", lock, 0);
-
- /*
- * waitLock is always in procHolders of waiting proc, if !first_run
- * then upper caller will handle waitProcs queue of waitLock.
- */
- if (thisProc->waitLock == lock && !first_run)
- goto nxtl;
-
- /*
- * If we found proc holding findlock and sleeping on some my other
- * lock then we have to check does it block me or another waiters.
- */
- if (lock == findlock && !first_run)
- {
- int lm;
-
- Assert(holder->nHolding > 0);
- for (lm = 1; lm <= lockctl->numLockModes; lm++)
- {
- if (holder->holding[lm] > 0 &&
- lockctl->conflictTab[lm] & findlock->waitMask)
- return true;
- }
-
- /*
- * Else - get the next lock from thisProc's procHolders
- */
- goto nxtl;
- }
-
- waitQueue = &(lock->waitProcs);
- waitProc = (PROC *) MAKE_PTR(waitQueue->links.next);
-
- /*
- * Inner loop scans over all processes waiting for this lock.
- *
- * NOTE: loop must count down because we want to examine each item
- * in the queue even if waitQueue->size decreases due to waking up
- * some of the processes.
- */
- for (i = waitQueue->size; --i >= 0; )
- {
- Assert(waitProc->waitLock == lock);
- if (waitProc == thisProc)
- {
- /* This should only happen at first level */
- Assert(waitProc == MyProc);
- goto nextWaitProc;
- }
- if (lock == findlock) /* first_run also true */
- {
- /*
- * If I'm blocked by his heldLocks...
- */
- if (lockctl->conflictTab[MyProc->waitLockMode] & waitProc->heldLocks)
- {
- /* and he blocked by me -> deadlock */
- if (lockctl->conflictTab[waitProc->waitLockMode] & MyProc->heldLocks)
- return true;
- /* we shouldn't look at procHolders of our blockers */
- goto nextWaitProc;
- }
-
- /*
- * If he isn't blocked by me and we request
- * non-conflicting lock modes - no deadlock here because
- * he isn't blocked by me in any sense (explicitly or
- * implicitly). Note that we don't do like test if
- * !first_run (when thisProc is holder and non-waiter on
- * lock) and so we call DeadLockCheck below for every
- * waitProc in thisProc->procHolders, even for waitProc-s
- * un-blocked by thisProc. Should we? This could save us
- * some time...
- */
- if (!(lockctl->conflictTab[waitProc->waitLockMode] & MyProc->heldLocks) &&
- !(lockctl->conflictTab[waitProc->waitLockMode] & (1 << MyProc->waitLockMode)))
- goto nextWaitProc;
- }
-
- /*
- * Skip this waiter if already checked.
- */
- for (j = 0; j < nprocs; j++)
- {
- if (checked_procs[j] == waitProc)
- goto nextWaitProc;
- }
-
- /* Recursively check this process's procHolders. */
- Assert(nprocs < MAXBACKENDS);
- checked_procs[nprocs++] = waitProc;
-
- if (DeadLockCheck(waitProc, findlock))
- {
- int heldLocks;
-
- /*
- * Ok, but is waitProc waiting for me (thisProc) ?
- */
- if (thisProc->waitLock == lock)
- {
- Assert(first_run);
- heldLocks = thisProc->heldLocks;
- }
- else
- {
- /* should we cache heldLocks to speed this up? */
- heldLocks = LockGetMyHeldLocks(holder->tag.lock, thisProc);
- Assert(heldLocks != 0);
- }
- if (lockctl->conflictTab[waitProc->waitLockMode] & heldLocks)
- {
- /*
- * Last attempt to avoid deadlock: try to wakeup myself.
- */
- if (first_run)
- {
- if (LockResolveConflicts(DEFAULT_LOCKMETHOD,
- MyProc->waitLockMode,
- MyProc->waitLock,
- MyProc->waitHolder,
- MyProc,
- NULL) == STATUS_OK)
- {
- GrantLock(MyProc->waitLock,
- MyProc->waitHolder,
- MyProc->waitLockMode);
- ProcWakeup(MyProc, NO_ERROR);
- return false;
- }
- }
- return true;
- }
-
- /*
- * Hell! Is he blocked by any (other) holder ?
- */
- if (LockResolveConflicts(DEFAULT_LOCKMETHOD,
- waitProc->waitLockMode,
- lock,
- waitProc->waitHolder,
- waitProc,
- NULL) != STATUS_OK)
- {
- /*
- * Blocked by others - no deadlock...
- */
- LOCK_PRINT("DeadLockCheck: blocked by others",
- lock, waitProc->waitLockMode);
- goto nextWaitProc;
- }
-
- /*
- * Well - wakeup this guy! This is the case of
- * implicit blocking: thisProc blocked someone who
- * blocked waitProc by the fact that he/someone is
- * already waiting for lock. We do this for
- * anti-starving.
- */
- GrantLock(lock, waitProc->waitHolder, waitProc->waitLockMode);
- waitProc = ProcWakeup(waitProc, NO_ERROR);
- /*
- * Use next-proc link returned by ProcWakeup, since this
- * proc's own links field is now cleared.
- */
- continue;
- }
-
-nextWaitProc:
- waitProc = (PROC *) MAKE_PTR(waitProc->links.next);
- }
-
-nxtl:
- holder = nextHolder;
- }
-
- /* if we got here, no deadlock */
- return false;
-}
#ifdef LOCK_DEBUG
/*
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.96 2001/01/24 19:43:08 momjian Exp $
+ * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.97 2001/01/25 03:31:16 tgl Exp $
*
*-------------------------------------------------------------------------
*/
*
*
* Interface (a):
- * ProcSleep(), ProcWakeup(), ProcWakeupNext(),
+ * ProcSleep(), ProcWakeup(),
* ProcQueueAlloc() -- create a shm queue for sleeping processes
* ProcQueueInit() -- create a queue without allocing memory
*
* shared among backends (we keep a few sets of semaphores around).
* This is so that we can support more backends. (system-wide semaphore
* sets run out pretty fast.) -ay 4/95
- *
- * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.96 2001/01/24 19:43:08 momjian Exp $
*/
#include "postgres.h"
}
SHMQueueElemInit(&(MyProc->links));
- MyProc->errType = NO_ERROR;
+ MyProc->errType = STATUS_OK;
MyProc->pid = MyProcPid;
MyProc->databaseId = MyDatabaseId;
MyProc->xid = InvalidTransactionId;
(location != MAKE_OFFSET(MyProc)))
elog(STOP, "InitProcess: ShmemPID table broken");
+ /*
+ * Arrange to clean up at backend exit.
+ */
on_shmem_exit(ProcKill, 0);
+
+ /*
+ * Now that we have a PROC, we could try to acquire locks,
+ * so initialize the deadlock checker.
+ */
+ InitDeadLockChecking();
}
/*
}
}
-/*
- * Remove a proc from the wait-queue it is on
- * (caller must know it is on one).
- * Locktable lock must be held by caller.
- *
- * NB: this does not remove the process' holder object, nor the lock object,
- * even though their counts might now have gone to zero. That will happen
- * during a subsequent LockReleaseAll call, which we expect will happen
- * during transaction cleanup. (Removal of a proc from its wait queue by
- * this routine can only happen if we are aborting the transaction.)
- */
-static void
-RemoveFromWaitQueue(PROC *proc)
-{
- LOCK *waitLock = proc->waitLock;
- LOCKMODE lockmode = proc->waitLockMode;
-
- /* Make sure proc is waiting */
- Assert(proc->links.next != INVALID_OFFSET);
- Assert(waitLock);
- Assert(waitLock->waitProcs.size > 0);
-
- /* Remove proc from lock's wait queue */
- SHMQueueDelete(&(proc->links));
- waitLock->waitProcs.size--;
-
- /* Undo increments of request counts by waiting process */
- Assert(waitLock->nRequested > 0);
- Assert(waitLock->nRequested > proc->waitLock->nGranted);
- waitLock->nRequested--;
- Assert(waitLock->requested[lockmode] > 0);
- waitLock->requested[lockmode]--;
- /* don't forget to clear waitMask bit if appropriate */
- if (waitLock->granted[lockmode] == waitLock->requested[lockmode])
- waitLock->waitMask &= ~(1 << lockmode);
-
- /* Clean up the proc's own state */
- proc->waitLock = NULL;
- proc->waitHolder = NULL;
-
- /* See if any other waiters for the lock can be woken up now */
- ProcLockWakeup(LOCK_LOCKMETHOD(*waitLock), waitLock);
-}
-
/*
* Cancel any pending wait for lock, when aborting a transaction.
*
/*
* ProcSleep -- put a process to sleep
*
- * P() on the semaphore should put us to sleep. The process
- * semaphore is normally zero, so when we try to acquire it, we sleep.
+ * Caller must have set MyProc->heldLocks to reflect locks already held
+ * on the lockable object by this process (under all XIDs).
*
* Locktable's spinlock must be held at entry, and will be held
* at exit.
*
- * Result is NO_ERROR if we acquired the lock, STATUS_ERROR if not (deadlock).
+ * Result: STATUS_OK if we acquired the lock, STATUS_ERROR if not (deadlock).
*
* ASSUME: that no one will fiddle with the queue until after
* we release the spin lock.
*
* NOTES: The process queue is now a priority queue for locking.
+ *
+ * P() on the semaphore should put us to sleep. The process
+ * semaphore is normally zero, so when we try to acquire it, we sleep.
*/
int
-ProcSleep(LOCKMETHODCTL *lockctl,
+ProcSleep(LOCKMETHODTABLE *lockMethodTable,
LOCKMODE lockmode,
LOCK *lock,
HOLDER *holder)
{
- PROC_QUEUE *waitQueue = &(lock->waitProcs);
+ LOCKMETHODCTL *lockctl = lockMethodTable->ctl;
SPINLOCK spinlock = lockctl->masterLock;
- int myMask = (1 << lockmode);
- int waitMask = lock->waitMask;
+ PROC_QUEUE *waitQueue = &(lock->waitProcs);
+ int myHeldLocks = MyProc->heldLocks;
PROC *proc;
int i;
- int aheadGranted[MAX_LOCKMODES];
- bool selfConflict = (lockctl->conflictTab[lockmode] & myMask),
- prevSame = false;
#ifndef __BEOS__
struct itimerval timeval,
dummy;
bigtime_t time_interval;
#endif
- proc = (PROC *) MAKE_PTR(waitQueue->links.next);
-
- /* if we don't conflict with any waiter - be first in queue */
- if (!(lockctl->conflictTab[lockmode] & waitMask))
- goto ins;
-
- /* otherwise, determine where we should go into the queue */
- for (i = 1; i < MAX_LOCKMODES; i++)
- aheadGranted[i] = lock->granted[i];
- (aheadGranted[lockmode])++;
-
- for (i = 0; i < waitQueue->size; i++)
+ /* ----------------------
+ * Determine where to add myself in the wait queue.
+ *
+ * Normally I should go at the end of the queue. However, if I already
+ * hold locks that conflict with the request of any previous waiter,
+ * put myself in the queue just in front of the first such waiter.
+ * This is not a necessary step, since deadlock detection would move
+ * me to before that waiter anyway; but it's relatively cheap to detect
+ * such a conflict immediately, and avoid delaying till deadlock timeout.
+ *
+ * Special case: if I find I should go in front of the first waiter,
+ * and I do not conflict with already-held locks, then just grant myself
+ * the requested lock immediately.
+ * ----------------------
+ */
+ if (myHeldLocks != 0)
{
- LOCKMODE procWaitMode = proc->waitLockMode;
-
- /* must I wait for him ? */
- if (lockctl->conflictTab[lockmode] & proc->heldLocks)
+ proc = (PROC *) MAKE_PTR(waitQueue->links.next);
+ for (i = 0; i < waitQueue->size; i++)
{
- /* is he waiting for me ? */
- if (lockctl->conflictTab[procWaitMode] & MyProc->heldLocks)
+ /* Must he wait for me? */
+ if (lockctl->conflictTab[proc->waitLockMode] & myHeldLocks)
{
- /* Yes, report deadlock failure */
- MyProc->errType = STATUS_ERROR;
- return STATUS_ERROR;
- }
- /* I must go after him in queue - so continue loop */
- }
- /* if he waits for me, go before him in queue */
- else if (lockctl->conflictTab[procWaitMode] & MyProc->heldLocks)
- break;
- /* if conflicting locks requested */
- else if (lockctl->conflictTab[procWaitMode] & myMask)
- {
-
- /*
- * If I request non self-conflicting lock and there are others
- * requesting the same lock just before this guy - stop here.
- */
- if (!selfConflict && prevSame)
+ /* Must I wait for him ? */
+ if (lockctl->conflictTab[lockmode] & proc->heldLocks)
+ {
+ /* Yes, can report deadlock failure immediately */
+ MyProc->errType = STATUS_ERROR;
+ return STATUS_ERROR;
+ }
+ if (i == 0)
+ {
+ /* I must go before first waiter. Check special case. */
+ if (LockCheckConflicts(lockMethodTable,
+ lockmode,
+ lock,
+ holder,
+ MyProc,
+ NULL) == STATUS_OK)
+ {
+ /* Skip the wait and just grant myself the lock. */
+ GrantLock(lock, holder, lockmode);
+ return STATUS_OK;
+ }
+ }
+ /* Break out of loop to put myself before him */
break;
+ }
+ proc = (PROC *) MAKE_PTR(proc->links.next);
}
-
- /*
- * Last attempt to not move any further to the back of the queue:
- * if we don't conflict with remaining waiters, stop here.
- */
- else if (!(lockctl->conflictTab[lockmode] & waitMask))
- break;
-
- /* Move past this guy, and update state accordingly */
- prevSame = (procWaitMode == lockmode);
- (aheadGranted[procWaitMode])++;
- if (aheadGranted[procWaitMode] == lock->requested[procWaitMode])
- waitMask &= ~(1 << procWaitMode);
- proc = (PROC *) MAKE_PTR(proc->links.next);
+ }
+ else
+ {
+ /* I hold no locks, so I can't push in front of anyone. */
+ proc = (PROC *) &(waitQueue->links);
}
-ins:;
/* -------------------
* Insert self into queue, ahead of the given proc (or at tail of queue).
* -------------------
SHMQueueInsertBefore(&(proc->links), &(MyProc->links));
waitQueue->size++;
- lock->waitMask |= myMask;
+ lock->waitMask |= (1 << lockmode);
/* Set up wait information in PROC object, too */
MyProc->waitLock = lock;
MyProc->waitHolder = holder;
MyProc->waitLockMode = lockmode;
- /* We assume the caller set up MyProc->heldLocks */
- MyProc->errType = NO_ERROR; /* initialize result for success */
+ MyProc->errType = STATUS_OK; /* initialize result for success */
/* mark that we are waiting for a lock */
waitingForLock = true;
* By delaying the check until we've waited for a bit, we can avoid
* running the rather expensive deadlock-check code in most cases.
*
- * Need to zero out struct to set the interval and the micro seconds fields
+ * Need to zero out struct to set the interval and the microseconds fields
* to 0.
* --------------
*/
/*
* ProcLockWakeup -- routine for waking up processes when a lock is
- * released.
+ * released (or a prior waiter is aborted). Scan all waiters
+ * for lock, waken any that are no longer blocked.
*/
-int
-ProcLockWakeup(LOCKMETHOD lockmethod, LOCK *lock)
+void
+ProcLockWakeup(LOCKMETHODTABLE *lockMethodTable, LOCK *lock)
{
- PROC_QUEUE *queue = &(lock->waitProcs);
+ LOCKMETHODCTL *lockctl = lockMethodTable->ctl;
+ PROC_QUEUE *waitQueue = &(lock->waitProcs);
+ int queue_size = waitQueue->size;
PROC *proc;
- int awoken = 0;
- LOCKMODE last_lockmode = 0;
- int queue_size = queue->size;
+ int conflictMask = 0;
Assert(queue_size >= 0);
- if (!queue_size)
- return STATUS_NOT_FOUND;
+ if (queue_size == 0)
+ return;
- proc = (PROC *) MAKE_PTR(queue->links.next);
+ proc = (PROC *) MAKE_PTR(waitQueue->links.next);
while (queue_size-- > 0)
{
- if (proc->waitLockMode == last_lockmode)
- {
- /*
- * This proc will conflict as the previous one did, don't even
- * try.
- */
- goto nextProc;
- }
+ LOCKMODE lockmode = proc->waitLockMode;
/*
- * Does this proc conflict with locks held by others ?
+ * Waken if (a) doesn't conflict with requests of earlier waiters,
+ * and (b) doesn't conflict with already-held locks.
*/
- if (LockResolveConflicts(lockmethod,
- proc->waitLockMode,
- lock,
- proc->waitHolder,
- proc,
- NULL) != STATUS_OK)
+ if (((1 << lockmode) & conflictMask) == 0 &&
+ LockCheckConflicts(lockMethodTable,
+ lockmode,
+ lock,
+ proc->waitHolder,
+ proc,
+ NULL) == STATUS_OK)
{
- /* Yes. Quit if we already awoke at least one process. */
- if (awoken != 0)
- break;
- /* Otherwise, see if any later waiters can be awoken. */
- last_lockmode = proc->waitLockMode;
- goto nextProc;
+ /* OK to waken */
+ GrantLock(lock, proc->waitHolder, lockmode);
+ proc = ProcWakeup(proc, STATUS_OK);
+ /*
+ * ProcWakeup removes proc from the lock's waiting process queue
+ * and returns the next proc in chain; don't use proc's next-link,
+ * because it's been cleared.
+ */
}
-
- /*
- * OK to wake up this sleeping process.
- */
- GrantLock(lock, proc->waitHolder, proc->waitLockMode);
- proc = ProcWakeup(proc, NO_ERROR);
- awoken++;
-
- /*
- * ProcWakeup removes proc from the lock's waiting process queue
- * and returns the next proc in chain; don't use proc's next-link,
- * because it's been cleared.
- */
- continue;
-
-nextProc:
- proc = (PROC *) MAKE_PTR(proc->links.next);
- }
-
- Assert(queue->size >= 0);
-
- if (awoken)
- return STATUS_OK;
- else
- {
- /* Something is still blocking us. May have deadlocked. */
-#ifdef LOCK_DEBUG
- if (lock->tag.lockmethod == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks)
+ else
{
- elog(DEBUG, "ProcLockWakeup: lock(%lx) can't wake up any process",
- MAKE_OFFSET(lock));
- if (Debug_deadlocks)
- DumpAllLocks();
+ /* Cannot wake this guy. Add his request to conflict mask. */
+ conflictMask |= lockctl->conflictTab[lockmode];
+ proc = (PROC *) MAKE_PTR(proc->links.next);
}
-#endif
- return STATUS_NOT_FOUND;
}
+
+ Assert(waitQueue->size >= 0);
}
/* --------------------
DumpAllLocks();
#endif
- if (!DeadLockCheck(MyProc, MyProc->waitLock))
+ if (!DeadLockCheck(MyProc))
{
/* No deadlock, so keep waiting */
UnlockLockTable();
* Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $Id: lock.h,v 1.43 2001/01/24 19:43:27 momjian Exp $
+ * $Id: lock.h,v 1.44 2001/01/25 03:31:16 tgl Exp $
*
*-------------------------------------------------------------------------
*/
extern void InitLocks(void);
extern void LockDisable(bool status);
extern bool LockingDisabled(void);
+extern LOCKMETHODTABLE *GetLocksMethodTable(LOCK *lock);
extern LOCKMETHOD LockMethodTableInit(char *tabName, LOCKMASK *conflictsP,
int *prioP, int numModes, int maxBackends);
extern LOCKMETHOD LockMethodTableRename(LOCKMETHOD lockmethod);
TransactionId xid, LOCKMODE lockmode);
extern bool LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc,
bool allxids, TransactionId xid);
-extern int LockResolveConflicts(LOCKMETHOD lockmethod, LOCKMODE lockmode,
- LOCK *lock, HOLDER *holder, PROC *proc,
- int *myHolding);
+extern int LockCheckConflicts(LOCKMETHODTABLE *lockMethodTable,
+ LOCKMODE lockmode,
+ LOCK *lock, HOLDER *holder, PROC *proc,
+ int *myHolding);
extern void GrantLock(LOCK *lock, HOLDER *holder, LOCKMODE lockmode);
+extern void RemoveFromWaitQueue(PROC *proc);
extern int LockShmemSize(int maxBackends);
-extern bool DeadLockCheck(PROC *thisProc, LOCK *findlock);
+extern bool DeadLockCheck(PROC *proc);
+extern void InitDeadLockChecking(void);
#ifdef LOCK_DEBUG
extern void DumpLocks(void);
* Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $Id: proc.h,v 1.38 2001/01/24 19:43:28 momjian Exp $
+ * $Id: proc.h,v 1.39 2001/01/25 03:31:16 tgl Exp $
*
*-------------------------------------------------------------------------
*/
SHM_QUEUE links; /* list link if process is in a list */
SEMA sem; /* ONE semaphore to sleep on */
- int errType; /* error code tells why we woke up */
+ int errType; /* STATUS_OK or STATUS_ERROR after wakeup */
TransactionId xid; /* transaction currently being executed by
* this proc */
if (MyProc) (MyProc->sLocks[(lock)])--; \
} while (0)
-/*
- * flags explaining why process woke up
- */
-#define NO_ERROR 0
-#define ERR_TIMEOUT 1
-#define ERR_BUFFER_IO 2
-
/*
* There is one ProcGlobal struct for the whole installation.
extern bool ProcRemove(int pid);
extern void ProcQueueInit(PROC_QUEUE *queue);
-extern int ProcSleep(LOCKMETHODCTL *lockctl, LOCKMODE lockmode,
+extern int ProcSleep(LOCKMETHODTABLE *lockMethodTable, LOCKMODE lockmode,
LOCK *lock, HOLDER *holder);
extern PROC *ProcWakeup(PROC *proc, int errType);
-extern int ProcLockWakeup(LOCKMETHOD lockmethod, LOCK *lock);
+extern void ProcLockWakeup(LOCKMETHODTABLE *lockMethodTable, LOCK *lock);
extern void ProcReleaseSpins(PROC *proc);
extern bool LockWaitCancel(void);
extern void HandleDeadLock(SIGNAL_ARGS);