# Some simple queue module tests, plus some failure conditions
# to ensure the Queue locks remain stable.
import itertools
-import queue
import random
import threading
import time
import weakref
from test import support
-
-try:
- import _queue
-except ImportError:
- _queue = None
+py_queue = support.import_fresh_module('queue', blocked=['_queue'])
+c_queue = support.import_fresh_module('queue', fresh=['_queue'])
+need_c_queue = unittest.skipUnless(c_queue, "No _queue module found")
QUEUE_SIZE = 5
try:
q.put(full, block=0)
self.fail("Didn't appear to block with a full queue")
- except queue.Full:
+ except self.queue.Full:
pass
try:
q.put(full, timeout=0.01)
self.fail("Didn't appear to time-out with a full queue")
- except queue.Full:
+ except self.queue.Full:
pass
# Test a blocking put
self.do_blocking_test(q.put, (full,), q.get, ())
try:
q.get(block=0)
self.fail("Didn't appear to block with an empty queue")
- except queue.Empty:
+ except self.queue.Empty:
pass
try:
q.get(timeout=0.01)
self.fail("Didn't appear to time-out with an empty queue")
- except queue.Empty:
+ except self.queue.Empty:
pass
# Test a blocking get
self.do_blocking_test(q.get, (), q.put, ('empty',))
q = self.type2test(QUEUE_SIZE)
for i in range(QUEUE_SIZE):
q.put_nowait(1)
- with self.assertRaises(queue.Full):
+ with self.assertRaises(self.queue.Full):
q.put_nowait(1)
for i in range(QUEUE_SIZE):
q.get_nowait()
- with self.assertRaises(queue.Empty):
+ with self.assertRaises(self.queue.Empty):
q.get_nowait()
def test_shrinking_queue(self):
q.put(1)
q.put(2)
q.put(3)
- with self.assertRaises(queue.Full):
+ with self.assertRaises(self.queue.Full):
q.put_nowait(4)
self.assertEqual(q.qsize(), 3)
q.maxsize = 2 # shrink the queue
- with self.assertRaises(queue.Full):
+ with self.assertRaises(self.queue.Full):
q.put_nowait(4)
-class QueueTest(BaseQueueTestMixin, unittest.TestCase):
- type2test = queue.Queue
+class QueueTest(BaseQueueTestMixin):
+
+ def setUp(self):
+ self.type2test = self.queue.Queue
+ super().setUp()
+
+class PyQueueTest(QueueTest, unittest.TestCase):
+ queue = py_queue
+
+
+@need_c_queue
+class CQueueTest(QueueTest, unittest.TestCase):
+ queue = c_queue
+
+
+class LifoQueueTest(BaseQueueTestMixin):
+
+ def setUp(self):
+ self.type2test = self.queue.LifoQueue
+ super().setUp()
+
+
+class PyLifoQueueTest(LifoQueueTest, unittest.TestCase):
+ queue = py_queue
+
+
+@need_c_queue
+class CLifoQueueTest(LifoQueueTest, unittest.TestCase):
+ queue = c_queue
+
+
+class PriorityQueueTest(BaseQueueTestMixin):
+
+ def setUp(self):
+ self.type2test = self.queue.PriorityQueue
+ super().setUp()
+
-class LifoQueueTest(BaseQueueTestMixin, unittest.TestCase):
- type2test = queue.LifoQueue
+class PyPriorityQueueTest(PriorityQueueTest, unittest.TestCase):
+ queue = py_queue
-class PriorityQueueTest(BaseQueueTestMixin, unittest.TestCase):
- type2test = queue.PriorityQueue
+@need_c_queue
+class CPriorityQueueTest(PriorityQueueTest, unittest.TestCase):
+ queue = c_queue
# A Queue subclass that can provoke failure at a moment's notice :)
-class FailingQueueException(Exception):
- pass
-
-class FailingQueue(queue.Queue):
- def __init__(self, *args):
- self.fail_next_put = False
- self.fail_next_get = False
- queue.Queue.__init__(self, *args)
- def _put(self, item):
- if self.fail_next_put:
- self.fail_next_put = False
- raise FailingQueueException("You Lose")
- return queue.Queue._put(self, item)
- def _get(self):
- if self.fail_next_get:
- self.fail_next_get = False
- raise FailingQueueException("You Lose")
- return queue.Queue._get(self)
-
-class FailingQueueTest(BlockingTestMixin, unittest.TestCase):
+class FailingQueueException(Exception): pass
+
+class FailingQueueTest(BlockingTestMixin):
+
+ def setUp(self):
+
+ Queue = self.queue.Queue
+
+ class FailingQueue(Queue):
+ def __init__(self, *args):
+ self.fail_next_put = False
+ self.fail_next_get = False
+ Queue.__init__(self, *args)
+ def _put(self, item):
+ if self.fail_next_put:
+ self.fail_next_put = False
+ raise FailingQueueException("You Lose")
+ return Queue._put(self, item)
+ def _get(self):
+ if self.fail_next_get:
+ self.fail_next_get = False
+ raise FailingQueueException("You Lose")
+ return Queue._get(self)
+
+ self.FailingQueue = FailingQueue
+
+ super().setUp()
def failing_queue_test(self, q):
if q.qsize():
self.assertTrue(not q.qsize(), "Queue should be empty")
def test_failing_queue(self):
+
# Test to make sure a queue is functioning correctly.
# Done twice to the same instance.
- q = FailingQueue(QUEUE_SIZE)
+ q = self.FailingQueue(QUEUE_SIZE)
self.failing_queue_test(q)
self.failing_queue_test(q)
+
+class PyFailingQueueTest(FailingQueueTest, unittest.TestCase):
+ queue = py_queue
+
+
+@need_c_queue
+class CFailingQueueTest(FailingQueueTest, unittest.TestCase):
+ queue = c_queue
+
+
class BaseSimpleQueueTest:
def setUp(self):
while True:
try:
val = q.get(block=False)
- except queue.Empty:
+ except self.queue.Empty:
time.sleep(1e-5)
else:
break
while True:
try:
val = q.get(timeout=1e-5)
- except queue.Empty:
+ except self.queue.Empty:
pass
else:
break
self.assertTrue(q.empty())
self.assertEqual(q.qsize(), 0)
- with self.assertRaises(queue.Empty):
+ with self.assertRaises(self.queue.Empty):
q.get(block=False)
- with self.assertRaises(queue.Empty):
+ with self.assertRaises(self.queue.Empty):
q.get(timeout=1e-3)
- with self.assertRaises(queue.Empty):
+ with self.assertRaises(self.queue.Empty):
q.get_nowait()
self.assertTrue(q.empty())
self.assertEqual(q.qsize(), 0)
class PySimpleQueueTest(BaseSimpleQueueTest, unittest.TestCase):
- type2test = queue._PySimpleQueue
+ queue = py_queue
+ def setUp(self):
+ self.type2test = self.queue._PySimpleQueue
+ super().setUp()
-@unittest.skipIf(_queue is None, "No _queue module found")
+
+@need_c_queue
class CSimpleQueueTest(BaseSimpleQueueTest, unittest.TestCase):
+ queue = c_queue
+
def setUp(self):
- self.type2test = _queue.SimpleQueue
+ self.type2test = self.queue.SimpleQueue
super().setUp()
def test_is_default(self):
- self.assertIs(self.type2test, queue.SimpleQueue)
+ self.assertIs(self.type2test, self.queue.SimpleQueue)
+ self.assertIs(self.type2test, self.queue.SimpleQueue)
def test_reentrancy(self):
# bpo-14976: put() may be called reentrantly in an asynchronous