self.assertRaises(TypeError,
self.loop.run_until_complete, 'blah')
+ def test_run_until_complete_loop(self):
+ task = asyncio.Future(loop=self.loop)
+ other_loop = self.new_test_loop()
+ self.assertRaises(ValueError,
+ other_loop.run_until_complete, task)
+
def test_subprocess_exec_invalid_args(self):
args = [sys.executable, '-c', 'pass']
self.set_event_loop(loop)
t = asyncio.Task(notmuch(), loop=loop)
self.assertIs(t._loop, loop)
+ loop.run_until_complete(t)
loop.close()
def test_async_coroutine(self):
self.set_event_loop(loop)
t = asyncio.async(notmuch(), loop=loop)
self.assertIs(t._loop, loop)
+ loop.run_until_complete(t)
loop.close()
def test_async_future(self):
t.add_done_callback(Dummy())
self.assertEqual(repr(t),
'<Task pending %s cb=[<Dummy>()]>' % coro)
+ self.loop.run_until_complete(t)
def test_task_basics(self):
@asyncio.coroutine