Return a task object.
"""
+ self._check_closed()
task = tasks.Task(coro, loop=self)
if task._source_traceback:
del task._source_traceback[-1]
if (coroutines.iscoroutine(callback)
or coroutines.iscoroutinefunction(callback)):
raise TypeError("coroutines cannot be used with call_at()")
+ self._check_closed()
if self._debug:
self._assert_is_current_event_loop()
timer = events.TimerHandle(when, callback, args, self)
raise TypeError("coroutines cannot be used with call_soon()")
if self._debug and check_loop:
self._assert_is_current_event_loop()
+ self._check_closed()
handle = events.Handle(callback, args, self)
if handle._source_traceback:
del handle._source_traceback[-1]
if (coroutines.iscoroutine(callback)
or coroutines.iscoroutinefunction(callback)):
raise TypeError("coroutines cannot be used with run_in_executor()")
+ self._check_closed()
if isinstance(callback, events.Handle):
assert not args
assert not isinstance(callback, events.TimerHandle)
def tearDown(self):
# just in case if we have transport close callbacks
- test_utils.run_briefly(self.loop)
+ if not self.loop.is_closed():
+ test_utils.run_briefly(self.loop)
self.loop.close()
gc.collect()
with self.assertRaises(RuntimeError):
self.loop.run_until_complete(coro)
+ def test_close(self):
+ self.loop.close()
+
+ @asyncio.coroutine
+ def test():
+ pass
+
+ func = lambda: False
+ coro = test()
+ self.addCleanup(coro.close)
+
+ # operation blocked when the loop is closed
+ with self.assertRaises(RuntimeError):
+ self.loop.run_forever()
+ with self.assertRaises(RuntimeError):
+ fut = asyncio.Future(loop=self.loop)
+ self.loop.run_until_complete(fut)
+ with self.assertRaises(RuntimeError):
+ self.loop.call_soon(func)
+ with self.assertRaises(RuntimeError):
+ self.loop.call_soon_threadsafe(func)
+ with self.assertRaises(RuntimeError):
+ self.loop.call_later(1.0, func)
+ with self.assertRaises(RuntimeError):
+ self.loop.call_at(self.loop.time() + .0, func)
+ with self.assertRaises(RuntimeError):
+ self.loop.run_in_executor(None, func)
+ with self.assertRaises(RuntimeError):
+ self.loop.create_task(coro)
+ with self.assertRaises(RuntimeError):
+ self.loop.add_signal_handler(signal.SIGTERM, func)
+
class SubprocessTestsMixin: