q = self.Queue(maxsize=1)
q.put(NotSerializable())
q.put(True)
- self.assertEqual(q.qsize(), 1)
+ try:
+ self.assertEqual(q.qsize(), 1)
+ except NotImplementedError:
+ # qsize is not available on all platform as it
+ # relies on sem_getvalue
+ pass
# bpo-30595: use a timeout of 1 second for slow buildbots
self.assertTrue(q.get(timeout=1.0))
# Check that the size of the queue is correct
- self.assertEqual(q.qsize(), 0)
+ self.assertTrue(q.empty())
close_queue(q)
def test_queue_feeder_on_queue_feeder_error(self):