threading.Thread.__init__(self)
def run(self):
- time.sleep(.1)
+ # The sleep isn't necessary, but is intended to give the blocking
+ # function in the main thread a chance at actually blocking before
+ # we unclog it. But if the sleep is longer than the timeout-based
+ # tests wait in their blocking functions, those tests will fail.
+ # So we give them much longer timeout values compared to the
+ # sleep here (I aimed at 10 seconds for blocking functions --
+ # they should never actually wait that long - they should make
+ # progress as soon as we call self.fn()).
+ time.sleep(0.1)
self.startedEvent.set()
self.fn(*self.args)
-# Execute a function that blocks, and in a seperate thread, a function that
+# Execute a function that blocks, and in a separate thread, a function that
# triggers the release. Returns the result of the blocking function.
+# Caution: block_func must guarantee to block until trigger_func is
+# called, and trigger_func must guarantee to change queue state so that
+# block_func can make enough progress to return. In particular, a
+# block_func that just raises an exception regardless of whether trigger_func
+# is called will lead to timing-dependent sporadic failures, and one of
+# those went rarely seen but undiagnosed for years. Now block_func
+# must be unexceptional. If block_func is supposed to raise an exception,
+# call _doExceptionalBlockingTest() instead.
def _doBlockingTest(block_func, block_args, trigger_func, trigger_args):
t = _TriggerThread(trigger_func, trigger_args)
t.start()
+ result = block_func(*block_args)
+ # If block_func returned before our thread made the call, we failed!
+ if not t.startedEvent.isSet():
+ raise TestFailed("blocking function '%r' appeared not to block" %
+ block_func)
+ t.join(10) # make sure the thread terminates
+ if t.isAlive():
+ raise TestFailed("trigger function '%r' appeared to not return" %
+ trigger_func)
+ return result
+
+# Call this instead if block_func is supposed to raise an exception.
+def _doExceptionalBlockingTest(block_func, block_args, trigger_func,
+ trigger_args, expected_exception_class):
+ t = _TriggerThread(trigger_func, trigger_args)
+ t.start()
try:
- return block_func(*block_args)
+ try:
+ block_func(*block_args)
+ except expected_exception_class:
+ raise
+ else:
+ raise TestFailed("expected exception of kind %r" %
+ expected_exception_class)
finally:
- # If we unblocked before our thread made the call, we failed!
- if not t.startedEvent.isSet():
- raise TestFailed("blocking function '%r' appeared not to block" %
- block_func)
- t.join(1) # make sure the thread terminates
+ t.join(10) # make sure the thread terminates
if t.isAlive():
raise TestFailed("trigger function '%r' appeared to not return" %
trigger_func)
+ if not t.startedEvent.isSet():
+ raise TestFailed("trigger thread ended but event never set")
# A Queue subclass that can provoke failure at a moment's notice :)
class FailingQueueException(Exception):
# Test a failing timeout put
q.fail_next_put = True
try:
- _doBlockingTest(q.put, ("full", True, 0.2), q.get, ())
+ _doExceptionalBlockingTest(q.put, ("full", True, 10), q.get, (),
+ FailingQueueException)
raise TestFailed("The queue didn't fail when it should have")
except FailingQueueException:
pass
verify(q.empty(), "Queue should be empty")
q.fail_next_get = True
try:
- _doBlockingTest( q.get, (), q.put, ('empty',))
+ _doExceptionalBlockingTest(q.get, (), q.put, ('empty',),
+ FailingQueueException)
raise TestFailed("The queue didn't fail when it should have")
except FailingQueueException:
pass
"Didn't seem to queue the correct data!")
for i in range(QUEUE_SIZE-1):
q.put(i)
+ verify(not q.empty(), "Queue should not be empty")
verify(not q.full(), "Queue should not be full")
q.put("last")
verify(q.full(), "Queue should be full")
except Queue.Full:
pass
try:
- q.put("full", timeout=0.1)
+ q.put("full", timeout=0.01)
raise TestFailed("Didn't appear to time-out with a full queue")
except Queue.Full:
pass
# Test a blocking put
_doBlockingTest(q.put, ("full",), q.get, ())
- _doBlockingTest(q.put, ("full", True, 0.2), q.get, ())
+ _doBlockingTest(q.put, ("full", True, 10), q.get, ())
# Empty it
for i in range(QUEUE_SIZE):
q.get()
except Queue.Empty:
pass
try:
- q.get(timeout=0.1)
+ q.get(timeout=0.01)
raise TestFailed("Didn't appear to time-out with an empty queue")
except Queue.Empty:
pass
# Test a blocking get
_doBlockingTest(q.get, (), q.put, ('empty',))
- _doBlockingTest(q.get, (True, 0.2), q.put, ('empty',))
+ _doBlockingTest(q.get, (True, 10), q.put, ('empty',))
def test():
q = Queue.Queue(QUEUE_SIZE)