when called from coroutines or callbacks.
"""Run until stop() is called."""
self._check_closed()
if self.is_running():
- raise RuntimeError('Event loop is running.')
+ raise RuntimeError('This event loop is already running')
+ if events._get_running_loop() is not None:
+ raise RuntimeError(
+ 'Cannot run the event loop while another loop is running')
self._set_coroutine_wrapper(self._debug)
self._thread_id = threading.get_ident()
if self._asyncgens is not None:
sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
finalizer=self._asyncgen_finalizer_hook)
try:
+ events._set_running_loop(self)
while True:
self._run_once()
if self._stopping:
finally:
self._stopping = False
self._thread_id = None
+ events._set_running_loop(None)
self._set_coroutine_wrapper(False)
if self._asyncgens is not None:
sys.set_asyncgen_hooks(*old_agen_hooks)
_lock = threading.Lock()
+# A TLS for the running event loop, used by _get_running_loop.
+class _RunningLoop(threading.local):
+ _loop = None
+_running_loop = _RunningLoop()
+
+
+def _get_running_loop():
+ """Return the running event loop or None.
+
+ This is a low-level function intended to be used by event loops.
+ This function is thread-specific.
+ """
+ return _running_loop._loop
+
+
+def _set_running_loop(loop):
+ """Set the running event loop.
+
+ This is a low-level function intended to be used by event loops.
+ This function is thread-specific.
+ """
+ _running_loop._loop = loop
+
+
def _init_event_loop_policy():
global _event_loop_policy
with _lock:
def get_event_loop():
- """Equivalent to calling get_event_loop_policy().get_event_loop()."""
+ """Return an asyncio event loop.
+
+ When called from a coroutine or a callback (e.g. scheduled with call_soon
+ or similar API), this function will always return the running event loop.
+
+ If there is no running event loop set, the function will return
+ the result of `get_event_loop_policy().get_event_loop()` call.
+ """
+ current_loop = _get_running_loop()
+ if current_loop is not None:
+ return current_loop
return get_event_loop_policy().get_event_loop()
self.set_event_loop(loop)
return loop
+ def setUp(self):
+ self._get_running_loop = events._get_running_loop
+ events._get_running_loop = lambda: None
+
def tearDown(self):
+ events._get_running_loop = self._get_running_loop
+
events.set_event_loop(None)
# Detect CPython bug #23353: ensure that yield/yield-from is not used
class BaseEventLoopTests(test_utils.TestCase):
def setUp(self):
+ super().setUp()
self.loop = base_events.BaseEventLoop()
self.loop._selector = mock.Mock()
self.loop._selector.select.return_value = ()
class BaseEventLoopWithSelectorTests(test_utils.TestCase):
def setUp(self):
+ super().setUp()
self.loop = asyncio.new_event_loop()
self.set_event_loop(self.loop)
"took .* seconds$")
+class RunningLoopTests(unittest.TestCase):
+
+ def test_running_loop_within_a_loop(self):
+ @asyncio.coroutine
+ def runner(loop):
+ loop.run_forever()
+
+ loop = asyncio.new_event_loop()
+ outer_loop = asyncio.new_event_loop()
+ try:
+ with self.assertRaisesRegex(RuntimeError,
+ 'while another loop is running'):
+ outer_loop.run_until_complete(runner(loop))
+ finally:
+ loop.close()
+ outer_loop.close()
+
+
if __name__ == '__main__':
unittest.main()
class HandleTests(test_utils.TestCase):
def setUp(self):
+ super().setUp()
self.loop = mock.Mock()
self.loop.get_debug.return_value = True
class TimerTests(unittest.TestCase):
def setUp(self):
+ super().setUp()
self.loop = mock.Mock()
def test_hash(self):
self.assertIs(policy, asyncio.get_event_loop_policy())
self.assertIsNot(policy, old_policy)
+ def test_get_event_loop_returns_running_loop(self):
+ class Policy(asyncio.DefaultEventLoopPolicy):
+ def get_event_loop(self):
+ raise NotImplementedError
+
+ loop = None
+
+ old_policy = asyncio.get_event_loop_policy()
+ try:
+ asyncio.set_event_loop_policy(Policy())
+ loop = asyncio.new_event_loop()
+
+ async def func():
+ self.assertIs(asyncio.get_event_loop(), loop)
+
+ loop.run_until_complete(func())
+ finally:
+ asyncio.set_event_loop_policy(old_policy)
+ if loop is not None:
+ loop.close()
+
if __name__ == '__main__':
unittest.main()
class DuckTests(test_utils.TestCase):
def setUp(self):
+ super().setUp()
self.loop = self.new_test_loop()
self.addCleanup(self.loop.close)
class FutureTests(test_utils.TestCase):
def setUp(self):
+ super().setUp()
self.loop = self.new_test_loop()
self.addCleanup(self.loop.close)
class FutureDoneCallbackTests(test_utils.TestCase):
def setUp(self):
+ super().setUp()
self.loop = self.new_test_loop()
def run_briefly(self):
class LockTests(test_utils.TestCase):
def setUp(self):
+ super().setUp()
self.loop = self.new_test_loop()
def test_ctor_loop(self):
class EventTests(test_utils.TestCase):
def setUp(self):
+ super().setUp()
self.loop = self.new_test_loop()
def test_ctor_loop(self):
class ConditionTests(test_utils.TestCase):
def setUp(self):
+ super().setUp()
self.loop = self.new_test_loop()
def test_ctor_loop(self):
class SemaphoreTests(test_utils.TestCase):
def setUp(self):
+ super().setUp()
self.loop = self.new_test_loop()
def test_ctor_loop(self):
class BaseTest(test_utils.TestCase):
def setUp(self):
+ super().setUp()
self.loop = asyncio.BaseEventLoop()
self.loop._process_events = mock.Mock()
self.loop._selector = mock.Mock()
class ProactorSocketTransportTests(test_utils.TestCase):
def setUp(self):
+ super().setUp()
self.loop = self.new_test_loop()
self.addCleanup(self.loop.close)
self.proactor = mock.Mock()
class BaseProactorEventLoopTests(test_utils.TestCase):
def setUp(self):
+ super().setUp()
+
self.sock = test_utils.mock_nonblocking_socket()
self.proactor = mock.Mock()
class _QueueTestBase(test_utils.TestCase):
def setUp(self):
+ super().setUp()
self.loop = self.new_test_loop()
class BaseSelectorEventLoopTests(test_utils.TestCase):
def setUp(self):
+ super().setUp()
self.selector = mock.Mock()
self.selector.select.return_value = []
self.loop = TestBaseSelectorEventLoop(self.selector)
class SelectorTransportTests(test_utils.TestCase):
def setUp(self):
+ super().setUp()
self.loop = self.new_test_loop()
self.protocol = test_utils.make_test_protocol(asyncio.Protocol)
self.sock = mock.Mock(socket.socket)
class SelectorSocketTransportTests(test_utils.TestCase):
def setUp(self):
+ super().setUp()
self.loop = self.new_test_loop()
self.protocol = test_utils.make_test_protocol(asyncio.Protocol)
self.sock = mock.Mock(socket.socket)
class SelectorSslTransportTests(test_utils.TestCase):
def setUp(self):
+ super().setUp()
self.loop = self.new_test_loop()
self.protocol = test_utils.make_test_protocol(asyncio.Protocol)
self.sock = mock.Mock(socket.socket)
class SelectorDatagramTransportTests(test_utils.TestCase):
def setUp(self):
+ super().setUp()
self.loop = self.new_test_loop()
self.protocol = test_utils.make_test_protocol(asyncio.DatagramProtocol)
self.sock = mock.Mock(spec_set=socket.socket)
class SslProtoHandshakeTests(test_utils.TestCase):
def setUp(self):
+ super().setUp()
self.loop = asyncio.new_event_loop()
self.set_event_loop(self.loop)
DATA = b'line1\nline2\nline3\n'
def setUp(self):
+ super().setUp()
self.loop = asyncio.new_event_loop()
self.set_event_loop(self.loop)
class SubprocessTransportTests(test_utils.TestCase):
def setUp(self):
+ super().setUp()
self.loop = self.new_test_loop()
self.set_event_loop(self.loop)
Watcher = None
def setUp(self):
+ super().setUp()
policy = asyncio.get_event_loop_policy()
self.loop = policy.new_event_loop()
self.set_event_loop(self.loop)
class SubprocessProactorTests(SubprocessMixin, test_utils.TestCase):
def setUp(self):
+ super().setUp()
self.loop = asyncio.ProactorEventLoop()
self.set_event_loop(self.loop)
class TaskTests(test_utils.TestCase):
def setUp(self):
+ super().setUp()
self.loop = self.new_test_loop()
def test_other_loop_future(self):
class GatherTestsBase:
def setUp(self):
+ super().setUp()
self.one_loop = self.new_test_loop()
self.other_loop = self.new_test_loop()
self.set_event_loop(self.one_loop, cleanup=False)
"""Test case for asyncio.run_coroutine_threadsafe."""
def setUp(self):
+ super().setUp()
self.loop = asyncio.new_event_loop()
self.set_event_loop(self.loop) # Will cleanup properly
class SleepTests(test_utils.TestCase):
def setUp(self):
+ super().setUp()
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(None)
def tearDown(self):
self.loop.close()
self.loop = None
+ super().tearDown()
def test_sleep_zero(self):
result = 0
class SelectorEventLoopSignalTests(test_utils.TestCase):
def setUp(self):
+ super().setUp()
self.loop = asyncio.SelectorEventLoop()
self.set_event_loop(self.loop)
class SelectorEventLoopUnixSocketTests(test_utils.TestCase):
def setUp(self):
+ super().setUp()
self.loop = asyncio.SelectorEventLoop()
self.set_event_loop(self.loop)
class UnixReadPipeTransportTests(test_utils.TestCase):
def setUp(self):
+ super().setUp()
self.loop = self.new_test_loop()
self.protocol = test_utils.make_test_protocol(asyncio.Protocol)
self.pipe = mock.Mock(spec_set=io.RawIOBase)
class UnixWritePipeTransportTests(test_utils.TestCase):
def setUp(self):
+ super().setUp()
self.loop = self.new_test_loop()
self.protocol = test_utils.make_test_protocol(asyncio.BaseProtocol)
self.pipe = mock.Mock(spec_set=io.RawIOBase)
ignore_warnings = mock.patch.object(log.logger, "warning")
def setUp(self):
+ super().setUp()
self.loop = self.new_test_loop()
self.running = False
self.zombies = {}
class ProactorTests(test_utils.TestCase):
def setUp(self):
+ super().setUp()
self.loop = asyncio.ProactorEventLoop()
self.set_event_loop(self.loop)
- Issue #28600: Optimize loop.call_soon().
+- Issue #28613: Fix get_event_loop() return the current loop if
+ called from coroutines/callbacks.
+
IDLE
----