import gc
import os.path
+import types
import unittest
from test.script_helper import assert_python_ok
self.assertRaises(ValueError, self.loop.run_until_complete,
asyncio.wait([], loop=self.loop))
+ def test_corowrapper_mocks_generator(self):
+
+ def check():
+ # A function that asserts various things.
+ # Called twice, with different debug flag values.
+
+ @asyncio.coroutine
+ def coro():
+ # The actual coroutine.
+ self.assertTrue(gen.gi_running)
+ yield from fut
+
+ # A completed Future used to run the coroutine.
+ fut = asyncio.Future(loop=self.loop)
+ fut.set_result(None)
+
+ # Call the coroutine.
+ gen = coro()
+
+ # Check some properties.
+ self.assertTrue(asyncio.iscoroutine(gen))
+ self.assertIsInstance(gen.gi_frame, types.FrameType)
+ self.assertFalse(gen.gi_running)
+ self.assertIsInstance(gen.gi_code, types.CodeType)
+
+ # Run it.
+ self.loop.run_until_complete(gen)
+
+ # The frame should have changed.
+ self.assertIsNone(gen.gi_frame)
+
+ # Save debug flag.
+ old_debug = asyncio.tasks._DEBUG
+ try:
+ # Test with debug flag cleared.
+ asyncio.tasks._DEBUG = False
+ check()
+
+ # Test with debug flag set.
+ asyncio.tasks._DEBUG = True
+ check()
+
+ finally:
+ # Restore original debug flag.
+ asyncio.tasks._DEBUG = old_debug
+
def test_yield_from_corowrapper(self):
old_debug = asyncio.tasks._DEBUG
asyncio.tasks._DEBUG = True