]> granicus.if.org Git - postgresql/blob - src/backend/storage/lmgr/deadlock.c
Complete TODO item:
[postgresql] / src / backend / storage / lmgr / deadlock.c
1 /*-------------------------------------------------------------------------
2  *
3  * deadlock.c
4  *        POSTGRES deadlock detection code
5  *
6  * See src/backend/storage/lmgr/README for a description of the deadlock
7  * detection and resolution algorithms.
8  *
9  *
10  * Portions Copyright (c) 1996-2002, PostgreSQL Global Development Group
11  * Portions Copyright (c) 1994, Regents of the University of California
12  *
13  *
14  * IDENTIFICATION
15  *        $Header: /cvsroot/pgsql/src/backend/storage/lmgr/deadlock.c,v 1.11 2002/07/18 23:06:19 momjian Exp $
16  *
17  *      Interface:
18  *
19  *      DeadLockCheck()
20  *      InitDeadLockChecking()
21  *
22  *-------------------------------------------------------------------------
23  */
24 #include "postgres.h"
25
26 #include "miscadmin.h"
27 #include "storage/proc.h"
28 #include "utils/memutils.h"
29
30
31 /* One edge in the waits-for graph */
32 typedef struct
33 {
34         PGPROC     *waiter;                     /* the waiting process */
35         PGPROC     *blocker;            /* the process it is waiting for */
36         int                     pred;                   /* workspace for TopoSort */
37         int                     link;                   /* workspace for TopoSort */
38 } EDGE;
39
40 /* One potential reordering of a lock's wait queue */
41 typedef struct
42 {
43         LOCK       *lock;                       /* the lock whose wait queue is described */
44         PGPROC    **procs;                      /* array of PGPROC *'s in new wait order */
45         int                     nProcs;
46 } WAIT_ORDER;
47
48
49 static bool DeadLockCheckRecurse(PGPROC *proc);
50 static bool TestConfiguration(PGPROC *startProc);
51 static bool FindLockCycle(PGPROC *checkProc,
52                           EDGE *softEdges, int *nSoftEdges);
53 static bool FindLockCycleRecurse(PGPROC *checkProc,
54                                          EDGE *softEdges, int *nSoftEdges);
55 static bool ExpandConstraints(EDGE *constraints, int nConstraints);
56 static bool TopoSort(LOCK *lock, EDGE *constraints, int nConstraints,
57                  PGPROC **ordering);
58
59 #ifdef DEBUG_DEADLOCK
60 static void PrintLockQueue(LOCK *lock, const char *info);
61 #endif
62
63
64 /*
65  * Working space for the deadlock detector
66  */
67
68 /* Workspace for FindLockCycle */
69 static PGPROC **visitedProcs;           /* Array of visited procs */
70 static int      nVisitedProcs;
71
72 /* Workspace for TopoSort */
73 static PGPROC **topoProcs;              /* Array of not-yet-output procs */
74 static int *beforeConstraints;  /* Counts of remaining before-constraints */
75 static int *afterConstraints;   /* List head for after-constraints */
76
77 /* Output area for ExpandConstraints */
78 static WAIT_ORDER *waitOrders;  /* Array of proposed queue rearrangements */
79 static int      nWaitOrders;
80 static PGPROC **waitOrderProcs; /* Space for waitOrders queue contents */
81
82 /* Current list of constraints being considered */
83 static EDGE *curConstraints;
84 static int      nCurConstraints;
85 static int      maxCurConstraints;
86
87 /* Storage space for results from FindLockCycle */
88 static EDGE *possibleConstraints;
89 static int      nPossibleConstraints;
90 static int      maxPossibleConstraints;
91
92
93 /*
94  * InitDeadLockChecking -- initialize deadlock checker during backend startup
95  *
96  * This does per-backend initialization of the deadlock checker; primarily,
97  * allocation of working memory for DeadLockCheck.      We do this per-backend
98  * since there's no percentage in making the kernel do copy-on-write
99  * inheritance of workspace from the postmaster.  We want to allocate the
100  * space at startup because the deadlock checker might be invoked when there's
101  * no free memory left.
102  */
103 void
104 InitDeadLockChecking(void)
105 {
106         MemoryContext oldcxt;
107
108         /* Make sure allocations are permanent */
109         oldcxt = MemoryContextSwitchTo(TopMemoryContext);
110
111         /*
112          * FindLockCycle needs at most MaxBackends entries in visitedProcs[]
113          */
114         visitedProcs = (PGPROC **) palloc(MaxBackends * sizeof(PGPROC *));
115
116         /*
117          * TopoSort needs to consider at most MaxBackends wait-queue entries,
118          * and it needn't run concurrently with FindLockCycle.
119          */
120         topoProcs = visitedProcs;       /* re-use this space */
121         beforeConstraints = (int *) palloc(MaxBackends * sizeof(int));
122         afterConstraints = (int *) palloc(MaxBackends * sizeof(int));
123
124         /*
125          * We need to consider rearranging at most MaxBackends/2 wait queues
126          * (since it takes at least two waiters in a queue to create a soft
127          * edge), and the expanded form of the wait queues can't involve more
128          * than MaxBackends total waiters.
129          */
130         waitOrders = (WAIT_ORDER *) palloc((MaxBackends / 2) * sizeof(WAIT_ORDER));
131         waitOrderProcs = (PGPROC **) palloc(MaxBackends * sizeof(PGPROC *));
132
133         /*
134          * Allow at most MaxBackends distinct constraints in a configuration.
135          * (Is this enough?  In practice it seems it should be, but I don't
136          * quite see how to prove it.  If we run out, we might fail to find a
137          * workable wait queue rearrangement even though one exists.)  NOTE
138          * that this number limits the maximum recursion depth of
139          * DeadLockCheckRecurse. Making it really big might potentially allow
140          * a stack-overflow problem.
141          */
142         maxCurConstraints = MaxBackends;
143         curConstraints = (EDGE *) palloc(maxCurConstraints * sizeof(EDGE));
144
145         /*
146          * Allow up to 3*MaxBackends constraints to be saved without having to
147          * re-run TestConfiguration.  (This is probably more than enough, but
148          * we can survive if we run low on space by doing excess runs of
149          * TestConfiguration to re-compute constraint lists each time needed.)
150          * The last MaxBackends entries in possibleConstraints[] are reserved
151          * as output workspace for FindLockCycle.
152          */
153         maxPossibleConstraints = MaxBackends * 4;
154         possibleConstraints =
155                 (EDGE *) palloc(maxPossibleConstraints * sizeof(EDGE));
156
157         MemoryContextSwitchTo(oldcxt);
158 }
159
160 /*
161  * DeadLockCheck -- Checks for deadlocks for a given process
162  *
163  * This code looks for deadlocks involving the given process.  If any
164  * are found, it tries to rearrange lock wait queues to resolve the
165  * deadlock.  If resolution is impossible, return TRUE --- the caller
166  * is then expected to abort the given proc's transaction.
167  *
168  * We can't block on user locks, so no sense testing for deadlock
169  * because there is no blocking, and no timer for the block.  So,
170  * only look at regular locks.
171  *
172  * We must have already locked the master lock before being called.
173  */
174 bool
175 DeadLockCheck(PGPROC *proc)
176 {
177         int                     i,
178                                 j;
179
180         /* Initialize to "no constraints" */
181         nCurConstraints = 0;
182         nPossibleConstraints = 0;
183         nWaitOrders = 0;
184
185         /* Search for deadlocks and possible fixes */
186         if (DeadLockCheckRecurse(proc))
187                 return true;                    /* cannot find a non-deadlocked state */
188
189         /* Apply any needed rearrangements of wait queues */
190         for (i = 0; i < nWaitOrders; i++)
191         {
192                 LOCK       *lock = waitOrders[i].lock;
193                 PGPROC    **procs = waitOrders[i].procs;
194                 int                     nProcs = waitOrders[i].nProcs;
195                 PROC_QUEUE *waitQueue = &(lock->waitProcs);
196
197                 Assert(nProcs == waitQueue->size);
198
199 #ifdef DEBUG_DEADLOCK
200                 PrintLockQueue(lock, "DeadLockCheck:");
201 #endif
202
203                 /* Reset the queue and re-add procs in the desired order */
204                 ProcQueueInit(waitQueue);
205                 for (j = 0; j < nProcs; j++)
206                 {
207                         SHMQueueInsertBefore(&(waitQueue->links), &(procs[j]->links));
208                         waitQueue->size++;
209                 }
210
211 #ifdef DEBUG_DEADLOCK
212                 PrintLockQueue(lock, "rearranged to:");
213 #endif
214
215                 /* See if any waiters for the lock can be woken up now */
216                 ProcLockWakeup(GetLocksMethodTable(lock), lock);
217         }
218         return false;
219 }
220
221 /*
222  * DeadLockCheckRecurse -- recursively search for valid orderings
223  *
224  * curConstraints[] holds the current set of constraints being considered
225  * by an outer level of recursion.      Add to this each possible solution
226  * constraint for any cycle detected at this level.
227  *
228  * Returns TRUE if no solution exists.  Returns FALSE if a deadlock-free
229  * state is attainable, in which case waitOrders[] shows the required
230  * rearrangements of lock wait queues (if any).
231  */
232 static bool
233 DeadLockCheckRecurse(PGPROC *proc)
234 {
235         int                     nEdges;
236         int                     oldPossibleConstraints;
237         bool            savedList;
238         int                     i;
239
240         nEdges = TestConfiguration(proc);
241         if (nEdges < 0)
242                 return true;                    /* hard deadlock --- no solution */
243         if (nEdges == 0)
244                 return false;                   /* good configuration found */
245         if (nCurConstraints >= maxCurConstraints)
246                 return true;                    /* out of room for active constraints? */
247         oldPossibleConstraints = nPossibleConstraints;
248         if (nPossibleConstraints + nEdges + MaxBackends <= maxPossibleConstraints)
249         {
250                 /* We can save the edge list in possibleConstraints[] */
251                 nPossibleConstraints += nEdges;
252                 savedList = true;
253         }
254         else
255         {
256                 /* Not room; will need to regenerate the edges on-the-fly */
257                 savedList = false;
258         }
259
260         /*
261          * Try each available soft edge as an addition to the configuration.
262          */
263         for (i = 0; i < nEdges; i++)
264         {
265                 if (!savedList && i > 0)
266                 {
267                         /* Regenerate the list of possible added constraints */
268                         if (nEdges != TestConfiguration(proc))
269                                 elog(FATAL, "DeadLockCheckRecurse: inconsistent results");
270                 }
271                 curConstraints[nCurConstraints] =
272                         possibleConstraints[oldPossibleConstraints + i];
273                 nCurConstraints++;
274                 if (!DeadLockCheckRecurse(proc))
275                         return false;           /* found a valid solution! */
276                 /* give up on that added constraint, try again */
277                 nCurConstraints--;
278         }
279         nPossibleConstraints = oldPossibleConstraints;
280         return true;                            /* no solution found */
281 }
282
283
284 /*--------------------
285  * Test a configuration (current set of constraints) for validity.
286  *
287  * Returns:
288  *              0: the configuration is good (no deadlocks)
289  *         -1: the configuration has a hard deadlock or is not self-consistent
290  *              >0: the configuration has one or more soft deadlocks
291  *
292  * In the soft-deadlock case, one of the soft cycles is chosen arbitrarily
293  * and a list of its soft edges is returned beginning at
294  * possibleConstraints+nPossibleConstraints.  The return value is the
295  * number of soft edges.
296  *--------------------
297  */
298 static bool
299 TestConfiguration(PGPROC *startProc)
300 {
301         int                     softFound = 0;
302         EDGE       *softEdges = possibleConstraints + nPossibleConstraints;
303         int                     nSoftEdges;
304         int                     i;
305
306         /*
307          * Make sure we have room for FindLockCycle's output.
308          */
309         if (nPossibleConstraints + MaxBackends > maxPossibleConstraints)
310                 return -1;
311
312         /*
313          * Expand current constraint set into wait orderings.  Fail if the
314          * constraint set is not self-consistent.
315          */
316         if (!ExpandConstraints(curConstraints, nCurConstraints))
317                 return -1;
318
319         /*
320          * Check for cycles involving startProc or any of the procs mentioned
321          * in constraints.      We check startProc last because if it has a soft
322          * cycle still to be dealt with, we want to deal with that first.
323          */
324         for (i = 0; i < nCurConstraints; i++)
325         {
326                 if (FindLockCycle(curConstraints[i].waiter, softEdges, &nSoftEdges))
327                 {
328                         if (nSoftEdges == 0)
329                                 return -1;              /* hard deadlock detected */
330                         softFound = nSoftEdges;
331                 }
332                 if (FindLockCycle(curConstraints[i].blocker, softEdges, &nSoftEdges))
333                 {
334                         if (nSoftEdges == 0)
335                                 return -1;              /* hard deadlock detected */
336                         softFound = nSoftEdges;
337                 }
338         }
339         if (FindLockCycle(startProc, softEdges, &nSoftEdges))
340         {
341                 if (nSoftEdges == 0)
342                         return -1;                      /* hard deadlock detected */
343                 softFound = nSoftEdges;
344         }
345         return softFound;
346 }
347
348
349 /*
350  * FindLockCycle -- basic check for deadlock cycles
351  *
352  * Scan outward from the given proc to see if there is a cycle in the
353  * waits-for graph that includes this proc.  Return TRUE if a cycle
354  * is found, else FALSE.  If a cycle is found, we also return a list of
355  * the "soft edges", if any, included in the cycle.  These edges could
356  * potentially be eliminated by rearranging wait queues.
357  *
358  * Since we need to be able to check hypothetical configurations that would
359  * exist after wait queue rearrangement, the routine pays attention to the
360  * table of hypothetical queue orders in waitOrders[].  These orders will
361  * be believed in preference to the actual ordering seen in the locktable.
362  */
363 static bool
364 FindLockCycle(PGPROC *checkProc,
365                           EDGE *softEdges,      /* output argument */
366                           int *nSoftEdges)      /* output argument */
367 {
368         nVisitedProcs = 0;
369         *nSoftEdges = 0;
370         return FindLockCycleRecurse(checkProc, softEdges, nSoftEdges);
371 }
372
373 static bool
374 FindLockCycleRecurse(PGPROC *checkProc,
375                                          EDGE *softEdges,       /* output argument */
376                                          int *nSoftEdges)       /* output argument */
377 {
378         PGPROC     *proc;
379         LOCK       *lock;
380         HOLDER     *holder;
381         SHM_QUEUE  *lockHolders;
382         LOCKMETHODTABLE *lockMethodTable;
383         PROC_QUEUE *waitQueue;
384         int                     queue_size;
385         int                     conflictMask;
386         int                     i;
387         int                     numLockModes,
388                                 lm;
389
390         /*
391          * Have we already seen this proc?
392          */
393         for (i = 0; i < nVisitedProcs; i++)
394         {
395                 if (visitedProcs[i] == checkProc)
396                 {
397                         /* If we return to starting point, we have a deadlock cycle */
398                         if (i == 0)
399                                 return true;
400
401                         /*
402                          * Otherwise, we have a cycle but it does not include the
403                          * start point, so say "no deadlock".
404                          */
405                         return false;
406                 }
407         }
408         /* Mark proc as seen */
409         Assert(nVisitedProcs < MaxBackends);
410         visitedProcs[nVisitedProcs++] = checkProc;
411
412         /*
413          * If the proc is not waiting, we have no outgoing waits-for edges.
414          */
415         if (checkProc->links.next == INVALID_OFFSET)
416                 return false;
417         lock = checkProc->waitLock;
418         if (lock == NULL)
419                 return false;
420         lockMethodTable = GetLocksMethodTable(lock);
421         numLockModes = lockMethodTable->numLockModes;
422         conflictMask = lockMethodTable->conflictTab[checkProc->waitLockMode];
423
424         /*
425          * Scan for procs that already hold conflicting locks.  These are
426          * "hard" edges in the waits-for graph.
427          */
428         lockHolders = &(lock->lockHolders);
429
430         holder = (HOLDER *) SHMQueueNext(lockHolders, lockHolders,
431                                                                          offsetof(HOLDER, lockLink));
432
433         while (holder)
434         {
435                 proc = (PGPROC *) MAKE_PTR(holder->tag.proc);
436
437                 /* A proc never blocks itself */
438                 if (proc != checkProc)
439                 {
440                         for (lm = 1; lm <= numLockModes; lm++)
441                         {
442                                 if (holder->holding[lm] > 0 &&
443                                         ((1 << lm) & conflictMask) != 0)
444                                 {
445                                         /* This proc hard-blocks checkProc */
446                                         if (FindLockCycleRecurse(proc, softEdges, nSoftEdges))
447                                                 return true;
448                                         /* If no deadlock, we're done looking at this holder */
449                                         break;
450                                 }
451                         }
452                 }
453
454                 holder = (HOLDER *) SHMQueueNext(lockHolders, &holder->lockLink,
455                                                                                  offsetof(HOLDER, lockLink));
456         }
457
458         /*
459          * Scan for procs that are ahead of this one in the lock's wait queue.
460          * Those that have conflicting requests soft-block this one.  This
461          * must be done after the hard-block search, since if another proc
462          * both hard- and soft-blocks this one, we want to call it a hard
463          * edge.
464          *
465          * If there is a proposed re-ordering of the lock's wait order, use that
466          * rather than the current wait order.
467          */
468         for (i = 0; i < nWaitOrders; i++)
469         {
470                 if (waitOrders[i].lock == lock)
471                         break;
472         }
473
474         if (i < nWaitOrders)
475         {
476                 /* Use the given hypothetical wait queue order */
477                 PGPROC    **procs = waitOrders[i].procs;
478
479                 queue_size = waitOrders[i].nProcs;
480
481                 for (i = 0; i < queue_size; i++)
482                 {
483                         proc = procs[i];
484
485                         /* Done when we reach the target proc */
486                         if (proc == checkProc)
487                                 break;
488
489                         /* Is there a conflict with this guy's request? */
490                         if (((1 << proc->waitLockMode) & conflictMask) != 0)
491                         {
492                                 /* This proc soft-blocks checkProc */
493                                 if (FindLockCycleRecurse(proc, softEdges, nSoftEdges))
494                                 {
495                                         /*
496                                          * Add this edge to the list of soft edges in the
497                                          * cycle
498                                          */
499                                         Assert(*nSoftEdges < MaxBackends);
500                                         softEdges[*nSoftEdges].waiter = checkProc;
501                                         softEdges[*nSoftEdges].blocker = proc;
502                                         (*nSoftEdges)++;
503                                         return true;
504                                 }
505                         }
506                 }
507         }
508         else
509         {
510                 /* Use the true lock wait queue order */
511                 waitQueue = &(lock->waitProcs);
512                 queue_size = waitQueue->size;
513
514                 proc = (PGPROC *) MAKE_PTR(waitQueue->links.next);
515
516                 while (queue_size-- > 0)
517                 {
518                         /* Done when we reach the target proc */
519                         if (proc == checkProc)
520                                 break;
521
522                         /* Is there a conflict with this guy's request? */
523                         if (((1 << proc->waitLockMode) & conflictMask) != 0)
524                         {
525                                 /* This proc soft-blocks checkProc */
526                                 if (FindLockCycleRecurse(proc, softEdges, nSoftEdges))
527                                 {
528                                         /*
529                                          * Add this edge to the list of soft edges in the
530                                          * cycle
531                                          */
532                                         Assert(*nSoftEdges < MaxBackends);
533                                         softEdges[*nSoftEdges].waiter = checkProc;
534                                         softEdges[*nSoftEdges].blocker = proc;
535                                         (*nSoftEdges)++;
536                                         return true;
537                                 }
538                         }
539
540                         proc = (PGPROC *) MAKE_PTR(proc->links.next);
541                 }
542         }
543
544         /*
545          * No conflict detected here.
546          */
547         return false;
548 }
549
550
551 /*
552  * ExpandConstraints -- expand a list of constraints into a set of
553  *              specific new orderings for affected wait queues
554  *
555  * Input is a list of soft edges to be reversed.  The output is a list
556  * of nWaitOrders WAIT_ORDER structs in waitOrders[], with PGPROC array
557  * workspace in waitOrderProcs[].
558  *
559  * Returns TRUE if able to build an ordering that satisfies all the
560  * constraints, FALSE if not (there are contradictory constraints).
561  */
562 static bool
563 ExpandConstraints(EDGE *constraints,
564                                   int nConstraints)
565 {
566         int                     nWaitOrderProcs = 0;
567         int                     i,
568                                 j;
569
570         nWaitOrders = 0;
571
572         /*
573          * Scan constraint list backwards.      This is because the last-added
574          * constraint is the only one that could fail, and so we want to test
575          * it for inconsistency first.
576          */
577         for (i = nConstraints; --i >= 0;)
578         {
579                 PGPROC     *proc = constraints[i].waiter;
580                 LOCK       *lock = proc->waitLock;
581
582                 /* Did we already make a list for this lock? */
583                 for (j = nWaitOrders; --j >= 0;)
584                 {
585                         if (waitOrders[j].lock == lock)
586                                 break;
587                 }
588                 if (j >= 0)
589                         continue;
590                 /* No, so allocate a new list */
591                 waitOrders[nWaitOrders].lock = lock;
592                 waitOrders[nWaitOrders].procs = waitOrderProcs + nWaitOrderProcs;
593                 waitOrders[nWaitOrders].nProcs = lock->waitProcs.size;
594                 nWaitOrderProcs += lock->waitProcs.size;
595                 Assert(nWaitOrderProcs <= MaxBackends);
596
597                 /*
598                  * Do the topo sort.  TopoSort need not examine constraints after
599                  * this one, since they must be for different locks.
600                  */
601                 if (!TopoSort(lock, constraints, i + 1,
602                                           waitOrders[nWaitOrders].procs))
603                         return false;
604                 nWaitOrders++;
605         }
606         return true;
607 }
608
609
610 /*
611  * TopoSort -- topological sort of a wait queue
612  *
613  * Generate a re-ordering of a lock's wait queue that satisfies given
614  * constraints about certain procs preceding others.  (Each such constraint
615  * is a fact of a partial ordering.)  Minimize rearrangement of the queue
616  * not needed to achieve the partial ordering.
617  *
618  * This is a lot simpler and slower than, for example, the topological sort
619  * algorithm shown in Knuth's Volume 1.  However, Knuth's method doesn't
620  * try to minimize the damage to the existing order.  In practice we are
621  * not likely to be working with more than a few constraints, so the apparent
622  * slowness of the algorithm won't really matter.
623  *
624  * The initial queue ordering is taken directly from the lock's wait queue.
625  * The output is an array of PGPROC pointers, of length equal to the lock's
626  * wait queue length (the caller is responsible for providing this space).
627  * The partial order is specified by an array of EDGE structs.  Each EDGE
628  * is one that we need to reverse, therefore the "waiter" must appear before
629  * the "blocker" in the output array.  The EDGE array may well contain
630  * edges associated with other locks; these should be ignored.
631  *
632  * Returns TRUE if able to build an ordering that satisfies all the
633  * constraints, FALSE if not (there are contradictory constraints).
634  */
635 static bool
636 TopoSort(LOCK *lock,
637                  EDGE *constraints,
638                  int nConstraints,
639                  PGPROC **ordering)             /* output argument */
640 {
641         PROC_QUEUE *waitQueue = &(lock->waitProcs);
642         int                     queue_size = waitQueue->size;
643         PGPROC     *proc;
644         int                     i,
645                                 j,
646                                 k,
647                                 last;
648
649         /* First, fill topoProcs[] array with the procs in their current order */
650         proc = (PGPROC *) MAKE_PTR(waitQueue->links.next);
651         for (i = 0; i < queue_size; i++)
652         {
653                 topoProcs[i] = proc;
654                 proc = (PGPROC *) MAKE_PTR(proc->links.next);
655         }
656
657         /*
658          * Scan the constraints, and for each proc in the array, generate a
659          * count of the number of constraints that say it must be before
660          * something else, plus a list of the constraints that say it must be
661          * after something else. The count for the j'th proc is stored in
662          * beforeConstraints[j], and the head of its list in
663          * afterConstraints[j].  Each constraint stores its list link in
664          * constraints[i].link (note any constraint will be in just one list).
665          * The array index for the before-proc of the i'th constraint is
666          * remembered in constraints[i].pred.
667          */
668         MemSet(beforeConstraints, 0, queue_size * sizeof(int));
669         MemSet(afterConstraints, 0, queue_size * sizeof(int));
670         for (i = 0; i < nConstraints; i++)
671         {
672                 proc = constraints[i].waiter;
673                 /* Ignore constraint if not for this lock */
674                 if (proc->waitLock != lock)
675                         continue;
676                 /* Find the waiter proc in the array */
677                 for (j = queue_size; --j >= 0;)
678                 {
679                         if (topoProcs[j] == proc)
680                                 break;
681                 }
682                 Assert(j >= 0);                 /* should have found a match */
683                 /* Find the blocker proc in the array */
684                 proc = constraints[i].blocker;
685                 for (k = queue_size; --k >= 0;)
686                 {
687                         if (topoProcs[k] == proc)
688                                 break;
689                 }
690                 Assert(k >= 0);                 /* should have found a match */
691                 beforeConstraints[j]++; /* waiter must come before */
692                 /* add this constraint to list of after-constraints for blocker */
693                 constraints[i].pred = j;
694                 constraints[i].link = afterConstraints[k];
695                 afterConstraints[k] = i + 1;
696         }
697         /*--------------------
698          * Now scan the topoProcs array backwards.      At each step, output the
699          * last proc that has no remaining before-constraints, and decrease
700          * the beforeConstraints count of each of the procs it was constrained
701          * against.
702          * i = index of ordering[] entry we want to output this time
703          * j = search index for topoProcs[]
704          * k = temp for scanning constraint list for proc j
705          * last = last non-null index in topoProcs (avoid redundant searches)
706          *--------------------
707          */
708         last = queue_size - 1;
709         for (i = queue_size; --i >= 0;)
710         {
711                 /* Find next candidate to output */
712                 while (topoProcs[last] == NULL)
713                         last--;
714                 for (j = last; j >= 0; j--)
715                 {
716                         if (topoProcs[j] != NULL && beforeConstraints[j] == 0)
717                                 break;
718                 }
719                 /* If no available candidate, topological sort fails */
720                 if (j < 0)
721                         return false;
722                 /* Output candidate, and mark it done by zeroing topoProcs[] entry */
723                 ordering[i] = topoProcs[j];
724                 topoProcs[j] = NULL;
725                 /* Update beforeConstraints counts of its predecessors */
726                 for (k = afterConstraints[j]; k > 0; k = constraints[k - 1].link)
727                         beforeConstraints[constraints[k - 1].pred]--;
728         }
729
730         /* Done */
731         return true;
732 }
733
734 #ifdef DEBUG_DEADLOCK
735 static void
736 PrintLockQueue(LOCK *lock, const char *info)
737 {
738         PROC_QUEUE *waitQueue = &(lock->waitProcs);
739         int                     queue_size = waitQueue->size;
740         PGPROC     *proc;
741         int                     i;
742
743         printf("%s lock %lx queue ", info, MAKE_OFFSET(lock));
744         proc = (PGPROC *) MAKE_PTR(waitQueue->links.next);
745         for (i = 0; i < queue_size; i++)
746         {
747                 printf(" %d", proc->pid);
748                 proc = (PGPROC *) MAKE_PTR(proc->links.next);
749         }
750         printf("\n");
751         fflush(stdout);
752 }
753
754 #endif