self.fail("trigger thread ended but event never set")
-class BaseQueueTest(unittest.TestCase, BlockingTestMixin):
+class BaseQueueTestMixin(BlockingTestMixin):
def setUp(self):
self.cum = 0
self.cumlock = threading.Lock()
with self.assertRaises(queue.Full):
q.put_nowait(4)
-class QueueTest(BaseQueueTest):
+class QueueTest(BaseQueueTestMixin, unittest.TestCase):
type2test = queue.Queue
-class LifoQueueTest(BaseQueueTest):
+class LifoQueueTest(BaseQueueTestMixin, unittest.TestCase):
type2test = queue.LifoQueue
-class PriorityQueueTest(BaseQueueTest):
+class PriorityQueueTest(BaseQueueTestMixin, unittest.TestCase):
type2test = queue.PriorityQueue