"""Like call_later(), but uses an absolute time."""
if tasks.iscoroutinefunction(callback):
raise TypeError("coroutines cannot be used with call_at()")
+ if self._debug:
+ self._assert_is_current_event_loop()
timer = events.TimerHandle(when, callback, args, self)
heapq.heappush(self._scheduled, timer)
return timer
Any positional arguments after the callback will be passed to
the callback when it is called.
"""
+ return self._call_soon(callback, args, check_loop=True)
+
+ def _call_soon(self, callback, args, check_loop):
if tasks.iscoroutinefunction(callback):
raise TypeError("coroutines cannot be used with call_soon()")
+ if self._debug and check_loop:
+ self._assert_is_current_event_loop()
handle = events.Handle(callback, args, self)
self._ready.append(handle)
return handle
+ def _assert_is_current_event_loop(self):
+ """Asserts that this event loop is the current event loop.
+
+ Non-threadsafe methods of this class make this assumption and will
+ likely behave incorrectly when the assumption is violated.
+
+ Should only be called when (self._debug == True). The caller is
+ responsible for checking this condition for performance reasons.
+ """
+ if events.get_event_loop() is not self:
+ raise RuntimeError(
+ "non-threadsafe operation invoked on an event loop other "
+ "than the current one")
+
def call_soon_threadsafe(self, callback, *args):
"""XXX"""
- handle = self.call_soon(callback, *args)
+ handle = self._call_soon(callback, args, check_loop=False)
self._write_to_self()
return handle
# are really slow
self.assertLessEqual(dt, 0.9, dt)
+ def test_assert_is_current_event_loop(self):
+ def cb():
+ pass
+
+ other_loop = base_events.BaseEventLoop()
+ other_loop._selector = unittest.mock.Mock()
+ asyncio.set_event_loop(other_loop)
+
+ # raise RuntimeError if the event loop is different in debug mode
+ self.loop.set_debug(True)
+ with self.assertRaises(RuntimeError):
+ self.loop.call_soon(cb)
+ with self.assertRaises(RuntimeError):
+ self.loop.call_later(60, cb)
+ with self.assertRaises(RuntimeError):
+ self.loop.call_at(self.loop.time() + 60, cb)
+
+ # check disabled if debug mode is disabled
+ self.loop.set_debug(False)
+ self.loop.call_soon(cb)
+ self.loop.call_later(60, cb)
+ self.loop.call_at(self.loop.time() + 60, cb)
+
def test_run_once_in_executor_handle(self):
def cb():
pass