Return the current time, as a :class:`float` value, according to the
event loop's internal clock.
+.. attribute:: BaseEventLoop.granularity
+
+ Granularity of the time: maximum between the resolution of the
+ :meth:`BaseEventLoop.time` method and the resolution of the selector (see
+ :attr:`selectors.BaseSelector.resolution`).
+
.. seealso::
The :func:`asyncio.sleep` function.
import concurrent.futures
import heapq
import logging
+import math
import socket
import subprocess
import time
self._default_executor = None
self._internal_fds = 0
self._running = False
+ self.granularity = time.get_clock_info('monotonic').resolution
def _make_socket_transport(self, sock, protocol, waiter=None, *,
extra=None, server=None):
elif self._scheduled:
# Compute the desired timeout.
when = self._scheduled[0]._when
+ # round deadline aways from zero
+ when = math.ceil(when / self.granularity) * self.granularity
deadline = max(0, when - self.time())
if timeout is None:
timeout = deadline
# Handle 'later' callbacks that are ready.
now = self.time()
+ # round current time aways from zero
+ now = math.ceil(now / self.granularity) * self.granularity
while self._scheduled:
handle = self._scheduled[0]
if handle._when > now:
selector = selectors.DefaultSelector()
logger.debug('Using selector: %s', selector.__class__.__name__)
self._selector = selector
+ self.granularity = max(selector.resolution, self.granularity)
self._make_self_pipe()
def _make_socket_transport(self, sock, protocol, waiter=None, *,
r.close()
w.close()
+ def test_timeout_rounding(self):
+ def _run_once():
+ self.loop._run_once_counter += 1
+ orig_run_once()
+
+ orig_run_once = self.loop._run_once
+ self.loop._run_once_counter = 0
+ self.loop._run_once = _run_once
+ calls = []
+
+ @tasks.coroutine
+ def wait():
+ loop = self.loop
+ calls.append(loop._run_once_counter)
+ yield from tasks.sleep(loop.granularity * 10, loop=loop)
+ calls.append(loop._run_once_counter)
+ yield from tasks.sleep(loop.granularity / 10, loop=loop)
+ calls.append(loop._run_once_counter)
+
+ self.loop.run_until_complete(wait())
+ calls.append(self.loop._run_once_counter)
+ self.assertEqual(calls, [1, 3, 5, 6])
+
class SubprocessTestsMixin:
class BaseSelectorEventLoopTests(unittest.TestCase):
def setUp(self):
- self.loop = TestBaseSelectorEventLoop(unittest.mock.Mock())
+ selector = unittest.mock.Mock()
+ selector.resolution = 1e-3
+ self.loop = TestBaseSelectorEventLoop(selector)
def test_make_socket_transport(self):
m = unittest.mock.Mock()
Library
-------
+- Issue #20311: asyncio: Add a granularity attribute to BaseEventLoop: maximum
+ between the resolution of the BaseEventLoop.time() method and the resolution
+ of the selector. The granuarility is used in the scheduler to round time and
+ deadline.
+
- Issue #20311: selectors: Add a resolution attribute to BaseSelector.
- Issue #20189: unittest.mock now no longer assumes that any object for