]> granicus.if.org Git - python/commitdiff
Issue #20556: Used specific assert methods in threading tests.
authorSerhiy Storchaka <storchaka@gmail.com>
Mon, 14 Mar 2016 08:28:59 +0000 (10:28 +0200)
committerSerhiy Storchaka <storchaka@gmail.com>
Mon, 14 Mar 2016 08:28:59 +0000 (10:28 +0200)
Lib/test/test_dummy_thread.py
Lib/test/test_threading.py
Lib/test/test_threading_local.py

index d9bdd3c3c082d2dc9693999f5baa9eaeb3e8829b..29a8531558e58d3bd057a23f54c8853b1e5aadea 100644 (file)
@@ -24,14 +24,14 @@ class LockTests(unittest.TestCase):
 
     def test_initlock(self):
         #Make sure locks start locked
-        self.assertTrue(not self.lock.locked(),
+        self.assertFalse(self.lock.locked(),
                         "Lock object is not initialized unlocked.")
 
     def test_release(self):
         # Test self.lock.release()
         self.lock.acquire()
         self.lock.release()
-        self.assertTrue(not self.lock.locked(),
+        self.assertFalse(self.lock.locked(),
                         "Lock object did not release properly.")
 
     def test_improper_release(self):
@@ -46,7 +46,7 @@ class LockTests(unittest.TestCase):
     def test_cond_acquire_fail(self):
         #Test acquiring locked lock returns False
         self.lock.acquire(0)
-        self.assertTrue(not self.lock.acquire(0),
+        self.assertFalse(self.lock.acquire(0),
                         "Conditional acquiring of a locked lock incorrectly "
                          "succeeded.")
 
@@ -58,9 +58,9 @@ class LockTests(unittest.TestCase):
 
     def test_uncond_acquire_return_val(self):
         #Make sure that an unconditional locking returns True.
-        self.assertTrue(self.lock.acquire(1) is True,
+        self.assertIs(self.lock.acquire(1), True,
                         "Unconditional locking did not return True.")
-        self.assertTrue(self.lock.acquire() is True)
+        self.assertIs(self.lock.acquire(), True)
 
     def test_uncond_acquire_blocking(self):
         #Make sure that unconditional acquiring of a locked lock blocks.
@@ -80,7 +80,7 @@ class LockTests(unittest.TestCase):
         end_time = int(time.time())
         if test_support.verbose:
             print "done"
-        self.assertTrue((end_time - start_time) >= DELAY,
+        self.assertGreaterEqual(end_time - start_time, DELAY,
                         "Blocking by unconditional acquiring failed.")
 
 class MiscTests(unittest.TestCase):
@@ -94,7 +94,7 @@ class MiscTests(unittest.TestCase):
         #Test sanity of _thread.get_ident()
         self.assertIsInstance(_thread.get_ident(), int,
                               "_thread.get_ident() returned a non-integer")
-        self.assertTrue(_thread.get_ident() != 0,
+        self.assertNotEqual(_thread.get_ident(), 0,
                         "_thread.get_ident() returned 0")
 
     def test_LockType(self):
@@ -164,7 +164,7 @@ class ThreadTests(unittest.TestCase):
         time.sleep(DELAY)
         if test_support.verbose:
             print 'done'
-        self.assertTrue(testing_queue.qsize() == thread_count,
+        self.assertEqual(testing_queue.qsize(), thread_count,
                         "Not all %s threads executed properly after %s sec." %
                         (thread_count, DELAY))
 
index 44f2c573536cee900702bfc628b0b942ae04b52f..212bd1aa7514987c3d6884f03f10462ccdc3758d 100644 (file)
@@ -51,7 +51,7 @@ class TestThread(threading.Thread):
                 self.nrunning.inc()
                 if verbose:
                     print self.nrunning.get(), 'tasks are running'
-                self.testcase.assertTrue(self.nrunning.get() <= 3)
+                self.testcase.assertLessEqual(self.nrunning.get(), 3)
 
             time.sleep(delay)
             if verbose:
@@ -59,7 +59,7 @@ class TestThread(threading.Thread):
 
             with self.mutex:
                 self.nrunning.dec()
-                self.testcase.assertTrue(self.nrunning.get() >= 0)
+                self.testcase.assertGreaterEqual(self.nrunning.get(), 0)
                 if verbose:
                     print '%s is finished. %d tasks are running' % (
                         self.name, self.nrunning.get())
@@ -92,25 +92,25 @@ class ThreadTests(BaseTestCase):
         for i in range(NUMTASKS):
             t = TestThread("<thread %d>"%i, self, sema, mutex, numrunning)
             threads.append(t)
-            self.assertEqual(t.ident, None)
-            self.assertTrue(re.match('<TestThread\(.*, initial\)>', repr(t)))
+            self.assertIsNone(t.ident)
+            self.assertRegexpMatches(repr(t), r'^<TestThread\(.*, initial\)>$')
             t.start()
 
         if verbose:
             print 'waiting for all tasks to complete'
         for t in threads:
             t.join(NUMTASKS)
-            self.assertTrue(not t.is_alive())
+            self.assertFalse(t.is_alive())
             self.assertNotEqual(t.ident, 0)
-            self.assertFalse(t.ident is None)
-            self.assertTrue(re.match('<TestThread\(.*, \w+ -?\d+\)>', repr(t)))
+            self.assertIsNotNone(t.ident)
+            self.assertRegexpMatches(repr(t), r'^<TestThread\(.*, \w+ -?\d+\)>$')
         if verbose:
             print 'all tasks done'
         self.assertEqual(numrunning.get(), 0)
 
     def test_ident_of_no_threading_threads(self):
         # The ident still must work for the main thread and dummy threads.
-        self.assertFalse(threading.currentThread().ident is None)
+        self.assertIsNotNone(threading.currentThread().ident)
         def f():
             ident.append(threading.currentThread().ident)
             done.set()
@@ -118,7 +118,7 @@ class ThreadTests(BaseTestCase):
         ident = []
         thread.start_new_thread(f, ())
         done.wait()
-        self.assertFalse(ident[0] is None)
+        self.assertIsNotNone(ident[0])
         # Kill the "immortal" _DummyThread
         del threading._active[ident[0]]
 
@@ -236,7 +236,7 @@ class ThreadTests(BaseTestCase):
         self.assertTrue(ret)
         if verbose:
             print "    verifying worker hasn't exited"
-        self.assertTrue(not t.finished)
+        self.assertFalse(t.finished)
         if verbose:
             print "    attempting to raise asynch exception in worker"
         result = set_async_exc(ctypes.c_long(t.id), exception)
index b161315c6fba20e4bf8b7ce9384aa1fa956606a6..e53400c40e3696e7401ccc75f9f7bd4ce5dbada7 100644 (file)
@@ -196,7 +196,7 @@ class ThreadLocalTest(unittest.TestCase, BaseLocalTest):
         wr = weakref.ref(x)
         del x
         gc.collect()
-        self.assertIs(wr(), None)
+        self.assertIsNone(wr())
 
 class PyThreadingLocalTest(unittest.TestCase, BaseLocalTest):
     _local = _threading_local.local