self.nrunning = nrunning
def run(self):
- delay = random.random() * 2
+ delay = random.random() / 10000.0
if verbose:
- print 'task', self.getName(), 'will run for', delay, 'sec'
+ print 'task %s will run for %.1f usec' % (
+ self.getName(), delay * 1e6)
- self.sema.acquire()
+ with self.sema:
+ with self.mutex:
+ self.nrunning.inc()
+ if verbose:
+ print self.nrunning.get(), 'tasks are running'
+ self.testcase.assert_(self.nrunning.get() <= 3)
- self.mutex.acquire()
- self.nrunning.inc()
- if verbose:
- print self.nrunning.get(), 'tasks are running'
- self.testcase.assert_(self.nrunning.get() <= 3)
- self.mutex.release()
-
- time.sleep(delay)
- if verbose:
- print 'task', self.getName(), 'done'
-
- self.mutex.acquire()
- self.nrunning.dec()
- self.testcase.assert_(self.nrunning.get() >= 0)
- if verbose:
- print self.getName(), 'is finished.', self.nrunning.get(), \
- 'tasks are running'
- self.mutex.release()
+ time.sleep(delay)
+ if verbose:
+ print 'task', self.getName(), 'done'
- self.sema.release()
+ with self.mutex:
+ self.nrunning.dec()
+ self.testcase.assert_(self.nrunning.get() >= 0)
+ if verbose:
+ print '%s is finished. %d tasks are running' % (
+ self.getName(), self.nrunning.get())
class ThreadTests(unittest.TestCase):
rc = subprocess.call([sys.executable, "-c", """if 1:
import ctypes, sys, time, thread
+ # This lock is used as a simple event variable.
+ ready = thread.allocate_lock()
+ ready.acquire()
+
# Module globals are cleared before __del__ is run
# So we save the functions in class dict
class C:
def waitingThread():
x = C()
+ ready.release()
time.sleep(100)
thread.start_new_thread(waitingThread, ())
- time.sleep(1) # be sure the other thread is waiting
+ ready.acquire() # Be sure the other thread is waiting.
sys.exit(42)
"""])
self.assertEqual(rc, 42)
# threading.enumerate() after it has been join()ed.
enum = threading.enumerate
old_interval = sys.getcheckinterval()
- sys.setcheckinterval(1)
try:
- for i in xrange(1, 1000):
+ for i in xrange(1, 100):
+ # Try a couple times at each thread-switching interval
+ # to get more interleavings.
+ sys.setcheckinterval(i // 5)
t = threading.Thread(target=lambda: None)
t.start()
t.join()