QUEUE_SIZE = 5
-def qfull(q):
- return q.maxsize > 0 and q.qsize() == q.maxsize
-
-
# A thread to run a function that unclogs a blocked Queue.
class _TriggerThread(threading.Thread):
def __init__(self, fn, args):
self.cumlock = threading.Lock()
def simple_queue_test(self, q):
- if q.qsize():
+ if not q.empty():
raise RuntimeError, "Call this function with an empty queue"
# I guess we better check things actually queue correctly a little :)
q.put(111)
"Didn't seem to queue the correct data!")
for i in range(QUEUE_SIZE-1):
q.put(i)
- self.assert_(q.qsize(), "Queue should not be empty")
- self.assert_(not qfull(q), "Queue should not be full")
+ self.assert_(not q.empty(), "Queue should not be empty")
+ self.assert_(not q.full(), "Queue should not be full")
q.put("last")
- self.assert_(qfull(q), "Queue should be full")
+ self.assert_(q.full(), "Queue should be full")
try:
q.put("full", block=0)
self.fail("Didn't appear to block with a full queue")
# Empty it
for i in range(QUEUE_SIZE):
q.get()
- self.assert_(not q.qsize(), "Queue should be empty")
+ self.assert_(q.empty(), "Queue should be empty")
try:
q.get(block=0)
self.fail("Didn't appear to block with an empty queue")
class FailingQueueTest(unittest.TestCase, BlockingTestMixin):
def failing_queue_test(self, q):
- if q.qsize():
+ if not q.empty():
raise RuntimeError, "Call this function with an empty queue"
for i in range(QUEUE_SIZE-1):
q.put(i)
except FailingQueueException:
pass
q.put("last")
- self.assert_(qfull(q), "Queue should be full")
+ self.assert_(q.full(), "Queue should be full")
# Test a failing blocking put
q.fail_next_put = True
try:
# Check the Queue isn't damaged.
# put failed, but get succeeded - re-add
q.put("last")
- self.assert_(qfull(q), "Queue should be full")
+ self.assert_(q.full(), "Queue should be full")
q.get()
- self.assert_(not qfull(q), "Queue should not be full")
+ self.assert_(not q.full(), "Queue should not be full")
q.put("last")
- self.assert_(qfull(q), "Queue should be full")
+ self.assert_(q.full(), "Queue should be full")
# Test a blocking put
self.do_blocking_test(q.put, ("full",), q.get, ())
# Empty it
for i in range(QUEUE_SIZE):
q.get()
- self.assert_(not q.qsize(), "Queue should be empty")
+ self.assert_(q.empty(), "Queue should be empty")
q.put("first")
q.fail_next_get = True
try:
self.fail("The queue didn't fail when it should have")
except FailingQueueException:
pass
- self.assert_(q.qsize(), "Queue should not be empty")
+ self.assert_(not q.empty(), "Queue should not be empty")
q.fail_next_get = True
try:
q.get(timeout=0.1)
self.fail("The queue didn't fail when it should have")
except FailingQueueException:
pass
- self.assert_(q.qsize(), "Queue should not be empty")
+ self.assert_(not q.empty(), "Queue should not be empty")
q.get()
- self.assert_(not q.qsize(), "Queue should be empty")
+ self.assert_(q.empty(), "Queue should be empty")
q.fail_next_get = True
try:
self.do_exceptional_blocking_test(q.get, (), q.put, ('empty',),
except FailingQueueException:
pass
# put succeeded, but get failed.
- self.assert_(q.qsize(), "Queue should not be empty")
+ self.assert_(not q.empty(), "Queue should not be empty")
q.get()
- self.assert_(not q.qsize(), "Queue should be empty")
+ self.assert_(q.empty(), "Queue should be empty")
def test_failing_queue(self):
# Test to make sure a queue is functioning correctly.