From 82733fac8dca465a1f0eb8e82b3b3f53cc6da145 Mon Sep 17 00:00:00 2001 From: Senthil Kumaran Date: Thu, 8 Sep 2016 02:46:22 -0700 Subject: [PATCH] Issue11551 - Increase the test coverage of _dummy_thread module to 100%. Initial patch contributed by Denver Coneybeare. --- Lib/test/test_dummy_thread.py | 156 +++++++++++++++++++++++++--------- 1 file changed, 115 insertions(+), 41 deletions(-) diff --git a/Lib/test/test_dummy_thread.py b/Lib/test/test_dummy_thread.py index a6a2856511..bb4bfe7a4d 100644 --- a/Lib/test/test_dummy_thread.py +++ b/Lib/test/test_dummy_thread.py @@ -1,19 +1,13 @@ -"""Generic thread tests. - -Meant to be used by dummy_thread and thread. To allow for different modules -to be used, test_main() can be called with the module to use as the thread -implementation as its sole argument. - -""" import _dummy_thread as _thread import time import queue import random import unittest from test import support +from unittest import mock + +DELAY = 0 -DELAY = 0 # Set > 0 when testing a module other than _dummy_thread, such as - # the '_thread' module. class LockTests(unittest.TestCase): """Test lock objects.""" @@ -34,6 +28,12 @@ class LockTests(unittest.TestCase): self.assertFalse(self.lock.locked(), "Lock object did not release properly.") + def test_LockType_context_manager(self): + with _thread.LockType(): + pass + self.assertFalse(self.lock.locked(), + "Acquired Lock was not released") + def test_improper_release(self): #Make sure release of an unlocked thread raises RuntimeError self.assertRaises(RuntimeError, self.lock.release) @@ -83,39 +83,72 @@ class LockTests(unittest.TestCase): self.assertGreaterEqual(end_time - start_time, DELAY, "Blocking by unconditional acquiring failed.") + @mock.patch('time.sleep') + def test_acquire_timeout(self, mock_sleep): + """Test invoking acquire() with a positive timeout when the lock is + already acquired. Ensure that time.sleep() is invoked with the given + timeout and that False is returned.""" + + self.lock.acquire() + retval = self.lock.acquire(waitflag=0, timeout=1) + self.assertTrue(mock_sleep.called) + mock_sleep.assert_called_once_with(1) + self.assertEqual(retval, False) + + def test_lock_representation(self): + self.lock.acquire() + self.assertIn("locked", repr(self.lock)) + self.lock.release() + self.assertIn("unlocked", repr(self.lock)) + + class MiscTests(unittest.TestCase): """Miscellaneous tests.""" def test_exit(self): - #Make sure _thread.exit() raises SystemExit self.assertRaises(SystemExit, _thread.exit) def test_ident(self): - #Test sanity of _thread.get_ident() self.assertIsInstance(_thread.get_ident(), int, "_thread.get_ident() returned a non-integer") self.assertNotEqual(_thread.get_ident(), 0, "_thread.get_ident() returned 0") def test_LockType(self): - #Make sure _thread.LockType is the same type as _thread.allocate_locke() self.assertIsInstance(_thread.allocate_lock(), _thread.LockType, "_thread.LockType is not an instance of what " "is returned by _thread.allocate_lock()") + def test_set_sentinel(self): + self.assertIsInstance(_thread._set_sentinel(), _thread.LockType, + "_thread._set_sentinel() did not return a " + "LockType instance.") + def test_interrupt_main(self): #Calling start_new_thread with a function that executes interrupt_main # should raise KeyboardInterrupt upon completion. def call_interrupt(): _thread.interrupt_main() - self.assertRaises(KeyboardInterrupt, _thread.start_new_thread, - call_interrupt, tuple()) + + self.assertRaises(KeyboardInterrupt, + _thread.start_new_thread, + call_interrupt, + tuple()) def test_interrupt_in_main(self): - # Make sure that if interrupt_main is called in main threat that - # KeyboardInterrupt is raised instantly. self.assertRaises(KeyboardInterrupt, _thread.interrupt_main) + def test_stack_size_None(self): + retval = _thread.stack_size(None) + self.assertEqual(retval, 0) + + def test_stack_size_not_None(self): + with self.assertRaises(_thread.error) as cm: + _thread.stack_size("") + self.assertEqual(cm.exception.args[0], + "setting thread stack size not supported") + + class ThreadTests(unittest.TestCase): """Test thread creation.""" @@ -129,31 +162,43 @@ class ThreadTests(unittest.TestCase): _thread.start_new_thread(arg_tester, (testing_queue, True, True)) result = testing_queue.get() self.assertTrue(result[0] and result[1], - "Argument passing for thread creation using tuple failed") - _thread.start_new_thread(arg_tester, tuple(), {'queue':testing_queue, - 'arg1':True, 'arg2':True}) + "Argument passing for thread creation " + "using tuple failed") + + _thread.start_new_thread( + arg_tester, + tuple(), + {'queue':testing_queue, 'arg1':True, 'arg2':True}) + result = testing_queue.get() self.assertTrue(result[0] and result[1], - "Argument passing for thread creation using kwargs failed") - _thread.start_new_thread(arg_tester, (testing_queue, True), {'arg2':True}) + "Argument passing for thread creation " + "using kwargs failed") + + _thread.start_new_thread( + arg_tester, + (testing_queue, True), + {'arg2':True}) + result = testing_queue.get() self.assertTrue(result[0] and result[1], "Argument passing for thread creation using both tuple" " and kwargs failed") - def test_multi_creation(self): - #Make sure multiple threads can be created. + def test_multi_thread_creation(self): def queue_mark(queue, delay): - """Wait for ``delay`` seconds and then put something into ``queue``""" time.sleep(delay) queue.put(_thread.get_ident()) thread_count = 5 testing_queue = queue.Queue(thread_count) + if support.verbose: print() - print("*** Testing multiple thread creation "\ - "(will take approx. %s to %s sec.) ***" % (DELAY, thread_count)) + print("*** Testing multiple thread creation " + "(will take approx. %s to %s sec.) ***" % ( + DELAY, thread_count)) + for count in range(thread_count): if DELAY: local_delay = round(random.random(), 1) @@ -165,18 +210,47 @@ class ThreadTests(unittest.TestCase): if support.verbose: print('done') self.assertEqual(testing_queue.qsize(), thread_count, - "Not all %s threads executed properly after %s sec." % - (thread_count, DELAY)) - -def test_main(imported_module=None): - global _thread, DELAY - if imported_module: - _thread = imported_module - DELAY = 2 - if support.verbose: - print() - print("*** Using %s as _thread module ***" % _thread) - support.run_unittest(LockTests, MiscTests, ThreadTests) - -if __name__ == '__main__': - test_main() + "Not all %s threads executed properly " + "after %s sec." % (thread_count, DELAY)) + + def test_args_not_tuple(self): + """ + Test invoking start_new_thread() with a non-tuple value for "args". + Expect TypeError with a meaningful error message to be raised. + """ + with self.assertRaises(TypeError) as cm: + _thread.start_new_thread(mock.Mock(), []) + self.assertEqual(cm.exception.args[0], "2nd arg must be a tuple") + + def test_kwargs_not_dict(self): + """ + Test invoking start_new_thread() with a non-dict value for "kwargs". + Expect TypeError with a meaningful error message to be raised. + """ + with self.assertRaises(TypeError) as cm: + _thread.start_new_thread(mock.Mock(), tuple(), kwargs=[]) + self.assertEqual(cm.exception.args[0], "3rd arg must be a dict") + + def test_SystemExit(self): + """ + Test invoking start_new_thread() with a function that raises + SystemExit. + The exception should be discarded. + """ + func = mock.Mock(side_effect=SystemExit()) + try: + _thread.start_new_thread(func, tuple()) + except SystemExit: + self.fail("start_new_thread raised SystemExit.") + + @mock.patch('traceback.print_exc') + def test_RaiseException(self, mock_print_exc): + """ + Test invoking start_new_thread() with a function that raises exception. + + The exception should be discarded and the traceback should be printed + via traceback.print_exc() + """ + func = mock.Mock(side_effect=Exception) + _thread.start_new_thread(func, tuple()) + self.assertTrue(mock_print_exc.called) -- 2.40.0