]> granicus.if.org Git - python/commitdiff
Make time.sleep calls go to 0 for common testing.
authorBrett Cannon <bcannon@gmail.com>
Wed, 30 Apr 2003 03:03:37 +0000 (03:03 +0000)
committerBrett Cannon <bcannon@gmail.com>
Wed, 30 Apr 2003 03:03:37 +0000 (03:03 +0000)
Lib/test/test_dummy_thread.py
Lib/test/test_dummy_threading.py

index 7be430ce358cf245dda4d6e56ea92deb6f333255..7237670cd16b36cda962e35dd7f0b6d99f7409ad 100644 (file)
@@ -12,6 +12,8 @@ import random
 import unittest
 from test import test_support
 
+DELAY = 0 # Set > 0 when testing a module other than dummy_thread, such as
+          # the 'thread' module.
 
 class LockTests(unittest.TestCase):
     """Test lock objects."""
@@ -67,18 +69,17 @@ class LockTests(unittest.TestCase):
             to_unlock.release()
 
         self.lock.acquire()
-        delay = 1  #In seconds
         start_time = int(time.time())
-        _thread.start_new_thread(delay_unlock,(self.lock, delay))
+        _thread.start_new_thread(delay_unlock,(self.lock, DELAY))
         if test_support.verbose:
             print
             print "*** Waiting for thread to release the lock "\
-            "(approx. %s sec.) ***" % delay
+            "(approx. %s sec.) ***" % DELAY
         self.lock.acquire()
         end_time = int(time.time())
         if test_support.verbose:
             print "done"
-        self.failUnless((end_time - start_time) >= delay,
+        self.failUnless((end_time - start_time) >= DELAY,
                         "Blocking by unconditional acquiring failed.")
 
 class MiscTests(unittest.TestCase):
@@ -134,26 +135,30 @@ class ThreadTests(unittest.TestCase):
             queue.put(_thread.get_ident())
 
         thread_count = 5
-        delay = 1.5
         testing_queue = Queue.Queue(thread_count)
         if test_support.verbose:
             print
             print "*** Testing multiple thread creation "\
-            "(will take approx. %s to %s sec.) ***" % (delay, thread_count)
+            "(will take approx. %s to %s sec.) ***" % (DELAY, thread_count)
         for count in xrange(thread_count):
+            if DELAY:
+                local_delay = round(random.random(), 1)
+            else:
+                local_delay = 0
             _thread.start_new_thread(queue_mark,
-                                     (testing_queue, round(random.random(), 1)))
-        time.sleep(delay)
+                                     (testing_queue, local_delay))
+        time.sleep(DELAY)
         if test_support.verbose:
             print 'done'
         self.failUnless(testing_queue.qsize() == thread_count,
                         "Not all %s threads executed properly after %s sec." %
-                        (thread_count, delay))
+                        (thread_count, DELAY))
 
 def test_main(imported_module=None):
-    global _thread
+    global _thread, DELAY
     if imported_module:
         _thread = imported_module
+        DELAY = 2
     if test_support.verbose:
         print
         print "*** Using %s as _thread module ***" % _thread
index e377230ef5d8fc9ea08768744349f0e37dd0581a..3724f1e5a4ee27a761c32065dd6e016353cea192 100644 (file)
@@ -12,7 +12,10 @@ class TestThread(_threading.Thread):
 
     def run(self):
         global running
-        delay = random.random() * 2
+        # Uncomment if testing another module, such as the real 'threading'
+        # module.
+        #delay = random.random() * 2
+        delay = 0
         if verbose:
             print 'task', self.getName(), 'will run for', delay, 'sec'
         sema.acquire()