class TestCase(unittest.TestCase):
+ @staticmethod
+ def close_loop(loop):
+ executor = loop._default_executor
+ if executor is not None:
+ executor.shutdown(wait=True)
+ loop.close()
+
def set_event_loop(self, loop, *, cleanup=True):
assert loop is not None
# ensure that the event loop is passed explicitly in asyncio
events.set_event_loop(None)
if cleanup:
- self.addCleanup(loop.close)
+ self.addCleanup(self.close_loop, loop)
def new_test_loop(self, gen=None):
loop = TestLoop(gen)
self.assertTrue(asyncio.isfuture(f2))
self.assertEqual(res, 'oi')
self.assertNotEqual(ident, threading.get_ident())
+ ex.shutdown(wait=True)
def test_wrap_future_future(self):
f1 = self._new_future(loop=self.loop)
f1 = ex.submit(run, 'oi')
f2 = asyncio.wrap_future(f1)
self.assertIs(self.loop, f2._loop)
+ ex.shutdown(wait=True)
def test_wrap_future_cancel(self):
f1 = concurrent.futures.Future()
self.loop._loop_self_reading)
def test_loop_self_reading_exception(self):
- self.loop.close = mock.Mock()
self.loop.call_exception_handler = mock.Mock()
self.proactor.recv.side_effect = OSError()
self.loop._loop_self_reading()