]> granicus.if.org Git - python/commitdiff
Issue #21755: Skip {Frozen,Source}_DeadlockAvoidanceTests tests when
authorBerker Peksag <berker.peksag@gmail.com>
Thu, 3 Jul 2014 03:25:10 +0000 (06:25 +0300)
committerBerker Peksag <berker.peksag@gmail.com>
Thu, 3 Jul 2014 03:25:10 +0000 (06:25 +0300)
Python is built without threads.

Lib/test/test_importlib/test_locks.py

index 4c01177990d7a408683ddc28606d6486b5bcaa44..df0af12d3886d65f152ddb65a01d8036af531f74 100644 (file)
@@ -52,74 +52,86 @@ else:
         pass
 
 
-@unittest.skipUnless(threading, "threads needed for this test")
-class DeadlockAvoidanceTests:
-
-    def setUp(self):
-        try:
-            self.old_switchinterval = sys.getswitchinterval()
-            sys.setswitchinterval(0.000001)
-        except AttributeError:
-            self.old_switchinterval = None
-
-    def tearDown(self):
-        if self.old_switchinterval is not None:
-            sys.setswitchinterval(self.old_switchinterval)
-
-    def run_deadlock_avoidance_test(self, create_deadlock):
-        NLOCKS = 10
-        locks = [self.LockType(str(i)) for i in range(NLOCKS)]
-        pairs = [(locks[i], locks[(i+1)%NLOCKS]) for i in range(NLOCKS)]
-        if create_deadlock:
-            NTHREADS = NLOCKS
-        else:
-            NTHREADS = NLOCKS - 1
-        barrier = threading.Barrier(NTHREADS)
-        results = []
-        def _acquire(lock):
-            """Try to acquire the lock. Return True on success, False on deadlock."""
+if threading is not None:
+    class DeadlockAvoidanceTests:
+
+        def setUp(self):
             try:
-                lock.acquire()
-            except self.DeadlockError:
-                return False
+                self.old_switchinterval = sys.getswitchinterval()
+                sys.setswitchinterval(0.000001)
+            except AttributeError:
+                self.old_switchinterval = None
+
+        def tearDown(self):
+            if self.old_switchinterval is not None:
+                sys.setswitchinterval(self.old_switchinterval)
+
+        def run_deadlock_avoidance_test(self, create_deadlock):
+            NLOCKS = 10
+            locks = [self.LockType(str(i)) for i in range(NLOCKS)]
+            pairs = [(locks[i], locks[(i+1)%NLOCKS]) for i in range(NLOCKS)]
+            if create_deadlock:
+                NTHREADS = NLOCKS
             else:
-                return True
-        def f():
-            a, b = pairs.pop()
-            ra = _acquire(a)
-            barrier.wait()
-            rb = _acquire(b)
-            results.append((ra, rb))
-            if rb:
-                b.release()
-            if ra:
-                a.release()
-        lock_tests.Bunch(f, NTHREADS).wait_for_finished()
-        self.assertEqual(len(results), NTHREADS)
-        return results
-
-    def test_deadlock(self):
-        results = self.run_deadlock_avoidance_test(True)
-        # At least one of the threads detected a potential deadlock on its
-        # second acquire() call.  It may be several of them, because the
-        # deadlock avoidance mechanism is conservative.
-        nb_deadlocks = results.count((True, False))
-        self.assertGreaterEqual(nb_deadlocks, 1)
-        self.assertEqual(results.count((True, True)), len(results) - nb_deadlocks)
-
-    def test_no_deadlock(self):
-        results = self.run_deadlock_avoidance_test(False)
-        self.assertEqual(results.count((True, False)), 0)
-        self.assertEqual(results.count((True, True)), len(results))
-
-
-DEADLOCK_ERRORS = {kind: splitinit._bootstrap._DeadlockError
-                   for kind, splitinit in init.items()}
-
-(Frozen_DeadlockAvoidanceTests,
- Source_DeadlockAvoidanceTests
- ) = test_util.test_both(DeadlockAvoidanceTests,
-                         LockType=LOCK_TYPES, DeadlockError=DEADLOCK_ERRORS)
+                NTHREADS = NLOCKS - 1
+            barrier = threading.Barrier(NTHREADS)
+            results = []
+
+            def _acquire(lock):
+                """Try to acquire the lock. Return True on success,
+                False on deadlock."""
+                try:
+                    lock.acquire()
+                except self.DeadlockError:
+                    return False
+                else:
+                    return True
+
+            def f():
+                a, b = pairs.pop()
+                ra = _acquire(a)
+                barrier.wait()
+                rb = _acquire(b)
+                results.append((ra, rb))
+                if rb:
+                    b.release()
+                if ra:
+                    a.release()
+            lock_tests.Bunch(f, NTHREADS).wait_for_finished()
+            self.assertEqual(len(results), NTHREADS)
+            return results
+
+        def test_deadlock(self):
+            results = self.run_deadlock_avoidance_test(True)
+            # At least one of the threads detected a potential deadlock on its
+            # second acquire() call.  It may be several of them, because the
+            # deadlock avoidance mechanism is conservative.
+            nb_deadlocks = results.count((True, False))
+            self.assertGreaterEqual(nb_deadlocks, 1)
+            self.assertEqual(results.count((True, True)), len(results) - nb_deadlocks)
+
+        def test_no_deadlock(self):
+            results = self.run_deadlock_avoidance_test(False)
+            self.assertEqual(results.count((True, False)), 0)
+            self.assertEqual(results.count((True, True)), len(results))
+
+
+    DEADLOCK_ERRORS = {kind: splitinit._bootstrap._DeadlockError
+                       for kind, splitinit in init.items()}
+
+    (Frozen_DeadlockAvoidanceTests,
+     Source_DeadlockAvoidanceTests
+     ) = test_util.test_both(DeadlockAvoidanceTests,
+                             LockType=LOCK_TYPES,
+                             DeadlockError=DEADLOCK_ERRORS)
+else:
+    DEADLOCK_ERRORS = {}
+
+    class Frozen_DeadlockAvoidanceTests(unittest.TestCase):
+        pass
+
+    class Source_DeadlockAvoidanceTests(unittest.TestCase):
+        pass
 
 
 class LifetimeTests: