self._internal_fds = 0
self._running = False
self._clock_resolution = time.get_clock_info('monotonic').resolution
+ self._exception_handler = None
def _make_socket_transport(self, sock, protocol, waiter=None, *,
extra=None, server=None):
"""Like call_later(), but uses an absolute time."""
if tasks.iscoroutinefunction(callback):
raise TypeError("coroutines cannot be used with call_at()")
- timer = events.TimerHandle(when, callback, args)
+ timer = events.TimerHandle(when, callback, args, self)
heapq.heappush(self._scheduled, timer)
return timer
"""
if tasks.iscoroutinefunction(callback):
raise TypeError("coroutines cannot be used with call_soon()")
- handle = events.Handle(callback, args)
+ handle = events.Handle(callback, args, self)
self._ready.append(handle)
return handle
protocol, popen_args, False, stdin, stdout, stderr, bufsize, **kwargs)
return transport, protocol
+ def set_exception_handler(self, handler):
+ """Set handler as the new event loop exception handler.
+
+ If handler is None, the default exception handler will
+ be set.
+
+ If handler is a callable object, it should have a
+ matching signature to '(loop, context)', where 'loop'
+ will be a reference to the active event loop, 'context'
+ will be a dict object (see `call_exception_handler()`
+ documentation for details about context).
+ """
+ if handler is not None and not callable(handler):
+ raise TypeError('A callable object or None is expected, '
+ 'got {!r}'.format(handler))
+ self._exception_handler = handler
+
+ def default_exception_handler(self, context):
+ """Default exception handler.
+
+ This is called when an exception occurs and no exception
+ handler is set, and can be called by a custom exception
+ handler that wants to defer to the default behavior.
+
+ context parameter has the same meaning as in
+ `call_exception_handler()`.
+ """
+ message = context.get('message')
+ if not message:
+ message = 'Unhandled exception in event loop'
+
+ exception = context.get('exception')
+ if exception is not None:
+ exc_info = (type(exception), exception, exception.__traceback__)
+ else:
+ exc_info = False
+
+ log_lines = [message]
+ for key in sorted(context):
+ if key in {'message', 'exception'}:
+ continue
+ log_lines.append('{}: {!r}'.format(key, context[key]))
+
+ logger.error('\n'.join(log_lines), exc_info=exc_info)
+
+ def call_exception_handler(self, context):
+ """Call the current event loop exception handler.
+
+ context is a dict object containing the following keys
+ (new keys maybe introduced later):
+ - 'message': Error message;
+ - 'exception' (optional): Exception object;
+ - 'future' (optional): Future instance;
+ - 'handle' (optional): Handle instance;
+ - 'protocol' (optional): Protocol instance;
+ - 'transport' (optional): Transport instance;
+ - 'socket' (optional): Socket instance.
+
+ Note: this method should not be overloaded in subclassed
+ event loops. For any custom exception handling, use
+ `set_exception_handler()` method.
+ """
+ if self._exception_handler is None:
+ try:
+ self.default_exception_handler(context)
+ except Exception:
+ # Second protection layer for unexpected errors
+ # in the default implementation, as well as for subclassed
+ # event loops with overloaded "default_exception_handler".
+ logger.error('Exception in default exception handler',
+ exc_info=True)
+ else:
+ try:
+ self._exception_handler(self, context)
+ except Exception as exc:
+ # Exception in the user set custom exception handler.
+ try:
+ # Let's try default handler.
+ self.default_exception_handler({
+ 'message': 'Unhandled error in exception handler',
+ 'exception': exc,
+ 'context': context,
+ })
+ except Exception:
+ # Guard 'default_exception_handler' in case it's
+ # overloaded.
+ logger.error('Exception in default exception handler '
+ 'while handling an unexpected error '
+ 'in custom exception handler',
+ exc_info=True)
+
def _add_callback(self, handle):
"""Add a Handle to ready or scheduled."""
assert isinstance(handle, events.Handle), 'A Handle is required here'
class Handle:
"""Object returned by callback registration methods."""
- __slots__ = ['_callback', '_args', '_cancelled']
+ __slots__ = ['_callback', '_args', '_cancelled', '_loop']
- def __init__(self, callback, args):
+ def __init__(self, callback, args, loop):
assert not isinstance(callback, Handle), 'A Handle is not a callback'
+ self._loop = loop
self._callback = callback
self._args = args
self._cancelled = False
def _run(self):
try:
self._callback(*self._args)
- except Exception:
- logger.exception('Exception in callback %s %r',
- self._callback, self._args)
+ except Exception as exc:
+ msg = 'Exception in callback {}{!r}'.format(self._callback,
+ self._args)
+ self._loop.call_exception_handler({
+ 'message': msg,
+ 'exception': exc,
+ 'handle': self,
+ })
self = None # Needed to break cycles when an exception occurs.
__slots__ = ['_when']
- def __init__(self, when, callback, args):
+ def __init__(self, when, callback, args, loop):
assert when is not None
- super().__init__(callback, args)
+ super().__init__(callback, args, loop)
self._when = when
def remove_signal_handler(self, sig):
raise NotImplementedError
+ # Error handlers.
+
+ def set_exception_handler(self, handler):
+ raise NotImplementedError
+
+ def default_exception_handler(self, context):
+ raise NotImplementedError
+
+ def call_exception_handler(self, context):
+ raise NotImplementedError
+
class AbstractEventLoopPolicy:
"""Abstract policy for accessing the event loop."""
in a discussion about closing files when they are collected.
"""
- __slots__ = ['exc', 'tb']
+ __slots__ = ['exc', 'tb', 'loop']
- def __init__(self, exc):
+ def __init__(self, exc, loop):
+ self.loop = loop
self.exc = exc
self.tb = None
def __del__(self):
if self.tb:
- logger.error('Future/Task exception was never retrieved:\n%s',
- ''.join(self.tb))
+ msg = 'Future/Task exception was never retrieved:\n{tb}'
+ context = {
+ 'message': msg.format(tb=''.join(self.tb)),
+ }
+ self.loop.call_exception_handler(context)
class Future:
# has consumed the exception
return
exc = self._exception
- logger.error('Future/Task exception was never retrieved:',
- exc_info=(exc.__class__, exc, exc.__traceback__))
+ context = {
+ 'message': 'Future/Task exception was never retrieved',
+ 'exception': exc,
+ 'future': self,
+ }
+ self._loop.call_exception_handler(context)
def cancel(self):
"""Cancel the future and schedule callbacks.
if _PY34:
self._log_traceback = True
else:
- self._tb_logger = _TracebackLogger(exception)
+ self._tb_logger = _TracebackLogger(exception, self._loop)
# Arrange for the logger to be activated after all callbacks
# have had a chance to call result() or exception().
self._loop.call_soon(self._tb_logger.activate)
def _fatal_error(self, exc):
if not isinstance(exc, (BrokenPipeError, ConnectionResetError)):
- logger.exception('Fatal error for %s', self)
+ self._loop.call_exception_handler({
+ 'message': 'Fatal transport error',
+ 'exception': exc,
+ 'transport': self,
+ 'protocol': self._protocol,
+ })
self._force_close(exc)
def _force_close(self, exc):
self._protocol_paused = True
try:
self._protocol.pause_writing()
- except Exception:
- logger.exception('pause_writing() failed')
+ except Exception as exc:
+ self._loop.call_exception_handler({
+ 'message': 'protocol.pause_writing() failed',
+ 'exception': exc,
+ 'transport': self,
+ 'protocol': self._protocol,
+ })
def _maybe_resume_protocol(self):
if (self._protocol_paused and
self._protocol_paused = False
try:
self._protocol.resume_writing()
- except Exception:
- logger.exception('resume_writing() failed')
+ except Exception as exc:
+ self._loop.call_exception_handler({
+ 'message': 'protocol.resume_writing() failed',
+ 'exception': exc,
+ 'transport': self,
+ 'protocol': self._protocol,
+ })
def set_write_buffer_limits(self, high=None, low=None):
if high is None:
conn, protocol,
extra={'peername': addr}, server=server)
f = self._proactor.accept(sock)
- except OSError:
+ except OSError as exc:
if sock.fileno() != -1:
- logger.exception('Accept failed')
+ self.call_exception_handler({
+ 'message': 'Accept failed',
+ 'exception': exc,
+ 'socket': sock,
+ })
sock.close()
except futures.CancelledError:
sock.close()
# Some platforms (e.g. Linux keep reporting the FD as
# ready, so we remove the read handler temporarily.
# We'll try again in a while.
- logger.exception('Accept out of system resource (%s)', exc)
+ self.call_exception_handler({
+ 'message': 'socket.accept() out of system resource',
+ 'exception': exc,
+ 'socket': sock,
+ })
self.remove_reader(sock.fileno())
self.call_later(constants.ACCEPT_RETRY_DELAY,
self._start_serving,
def add_reader(self, fd, callback, *args):
"""Add a reader callback."""
- handle = events.Handle(callback, args)
+ handle = events.Handle(callback, args, self)
try:
key = self._selector.get_key(fd)
except KeyError:
def add_writer(self, fd, callback, *args):
"""Add a writer callback.."""
- handle = events.Handle(callback, args)
+ handle = events.Handle(callback, args, self)
try:
key = self._selector.get_key(fd)
except KeyError:
self._protocol_paused = True
try:
self._protocol.pause_writing()
- except Exception:
- logger.exception('pause_writing() failed')
+ except Exception as exc:
+ self._loop.call_exception_handler({
+ 'message': 'protocol.pause_writing() failed',
+ 'exception': exc,
+ 'transport': self,
+ 'protocol': self._protocol,
+ })
def _maybe_resume_protocol(self):
if (self._protocol_paused and
self._protocol_paused = False
try:
self._protocol.resume_writing()
- except Exception:
- logger.exception('resume_writing() failed')
+ except Exception as exc:
+ self._loop.call_exception_handler({
+ 'message': 'protocol.resume_writing() failed',
+ 'exception': exc,
+ 'transport': self,
+ 'protocol': self._protocol,
+ })
def set_write_buffer_limits(self, high=None, low=None):
if high is None:
def _fatal_error(self, exc):
# Should be called from exception handler only.
if not isinstance(exc, (BrokenPipeError, ConnectionResetError)):
- logger.exception('Fatal error for %s', self)
+ self._loop.call_exception_handler({
+ 'message': 'Fatal transport error',
+ 'exception': exc,
+ 'transport': self,
+ 'protocol': self._protocol,
+ })
self._force_close(exc)
def _force_close(self, exc):
import contextlib
import io
import os
+import re
import socket
import socketserver
import sys
raise AssertionError("Time generator is not finished")
def add_reader(self, fd, callback, *args):
- self.readers[fd] = events.Handle(callback, args)
+ self.readers[fd] = events.Handle(callback, args, self)
def remove_reader(self, fd):
self.remove_reader_count[fd] += 1
handle._args, args)
def add_writer(self, fd, callback, *args):
- self.writers[fd] = events.Handle(callback, args)
+ self.writers[fd] = events.Handle(callback, args, self)
def remove_writer(self, fd):
self.remove_writer_count[fd] += 1
def MockCallback(**kwargs):
return unittest.mock.Mock(spec=['__call__'], **kwargs)
+
+
+class MockPattern(str):
+ """A regex based str with a fuzzy __eq__.
+
+ Use this helper with 'mock.assert_called_with', or anywhere
+ where a regexp comparison between strings is needed.
+
+ For instance:
+ mock_call.assert_called_with(MockPattern('spam.*ham'))
+ """
+ def __eq__(self, other):
+ return bool(re.search(str(self), other, re.S))
except ValueError as exc:
raise RuntimeError(str(exc))
- handle = events.Handle(callback, args)
+ handle = events.Handle(callback, args, self)
self._signal_handlers[sig] = handle
try:
def _fatal_error(self, exc):
# should be called by exception handler only
if not (isinstance(exc, OSError) and exc.errno == errno.EIO):
- logger.exception('Fatal error for %s', self)
+ self._loop.call_exception_handler({
+ 'message': 'Fatal transport error',
+ 'exception': exc,
+ 'transport': self,
+ 'protocol': self._protocol,
+ })
self._close(exc)
def _close(self, exc):
def _fatal_error(self, exc):
# should be called by exception handler only
if not isinstance(exc, (BrokenPipeError, ConnectionResetError)):
- logger.exception('Fatal error for %s', self)
+ self._loop.call_exception_handler({
+ 'message': 'Fatal transport error',
+ 'exception': exc,
+ 'transport': self,
+ 'protocol': self._protocol,
+ })
self._close(exc)
def _close(self, exc=None):
def _sig_chld(self):
try:
self._do_waitpid_all()
- except Exception:
- logger.exception('Unknown exception in SIGCHLD handler')
+ except Exception as exc:
+ # self._loop should always be available here
+ # as '_sig_chld' is added as a signal handler
+ # in 'attach_loop'
+ self._loop.call_exception_handler({
+ 'message': 'Unknown exception in SIGCHLD handler',
+ 'exception': exc,
+ })
def _compute_returncode(self, status):
if os.WIFSIGNALED(status):
if pipe is None:
return
f = self._proactor.accept_pipe(pipe)
- except OSError:
+ except OSError as exc:
if pipe and pipe.fileno() != -1:
- logger.exception('Pipe accept failed')
+ self.call_exception_handler({
+ 'message': 'Pipe accept failed',
+ 'exception': exc,
+ 'pipe': pipe,
+ })
pipe.close()
except futures.CancelledError:
if pipe:
from asyncio import test_utils
+MOCK_ANY = unittest.mock.ANY
+PY34 = sys.version_info >= (3, 4)
+
+
class BaseEventLoopTests(unittest.TestCase):
def setUp(self):
self.assertRaises(NotImplementedError, next, iter(gen))
def test__add_callback_handle(self):
- h = asyncio.Handle(lambda: False, ())
+ h = asyncio.Handle(lambda: False, (), self.loop)
self.loop._add_callback(h)
self.assertFalse(self.loop._scheduled)
self.assertIn(h, self.loop._ready)
def test__add_callback_timer(self):
- h = asyncio.TimerHandle(time.monotonic()+10, lambda: False, ())
+ h = asyncio.TimerHandle(time.monotonic()+10, lambda: False, (),
+ self.loop)
self.loop._add_callback(h)
self.assertIn(h, self.loop._scheduled)
def test__add_callback_cancelled_handle(self):
- h = asyncio.Handle(lambda: False, ())
+ h = asyncio.Handle(lambda: False, (), self.loop)
h.cancel()
self.loop._add_callback(h)
self.assertRaises(
AssertionError, self.loop.run_in_executor,
- None, asyncio.Handle(cb, ()), ('',))
+ None, asyncio.Handle(cb, (), self.loop), ('',))
self.assertRaises(
AssertionError, self.loop.run_in_executor,
- None, asyncio.TimerHandle(10, cb, ()))
+ None, asyncio.TimerHandle(10, cb, (), self.loop))
def test_run_once_in_executor_cancelled(self):
def cb():
pass
- h = asyncio.Handle(cb, ())
+ h = asyncio.Handle(cb, (), self.loop)
h.cancel()
f = self.loop.run_in_executor(None, h)
def test_run_once_in_executor_plain(self):
def cb():
pass
- h = asyncio.Handle(cb, ())
+ h = asyncio.Handle(cb, (), self.loop)
f = asyncio.Future(loop=self.loop)
executor = unittest.mock.Mock()
executor.submit.return_value = f
f.cancel() # Don't complain about abandoned Future.
def test__run_once(self):
- h1 = asyncio.TimerHandle(time.monotonic() + 5.0, lambda: True, ())
- h2 = asyncio.TimerHandle(time.monotonic() + 10.0, lambda: True, ())
+ h1 = asyncio.TimerHandle(time.monotonic() + 5.0, lambda: True, (),
+ self.loop)
+ h2 = asyncio.TimerHandle(time.monotonic() + 10.0, lambda: True, (),
+ self.loop)
h1.cancel()
m_time.monotonic = monotonic
self.loop._scheduled.append(
- asyncio.TimerHandle(11.0, lambda: True, ()))
+ asyncio.TimerHandle(11.0, lambda: True, (), self.loop))
self.loop._process_events = unittest.mock.Mock()
self.loop._run_once()
self.assertEqual(logging.INFO, m_logger.log.call_args[0][0])
idx = -1
data = [10.0, 10.0, 10.3, 13.0]
- self.loop._scheduled = [asyncio.TimerHandle(11.0, lambda: True, ())]
+ self.loop._scheduled = [asyncio.TimerHandle(11.0, lambda: True, (),
+ self.loop)]
self.loop._run_once()
self.assertEqual(logging.DEBUG, m_logger.log.call_args[0][0])
processed = True
handle = loop.call_soon(lambda: True)
- h = asyncio.TimerHandle(time.monotonic() - 1, cb, (self.loop,))
+ h = asyncio.TimerHandle(time.monotonic() - 1, cb, (self.loop,),
+ self.loop)
self.loop._process_events = unittest.mock.Mock()
self.loop._scheduled.append(h)
self.loop.run_until_complete, self.loop.subprocess_shell,
asyncio.SubprocessProtocol, 'exit 0', bufsize=4096)
+ def test_default_exc_handler_callback(self):
+ self.loop._process_events = unittest.mock.Mock()
+
+ def zero_error(fut):
+ fut.set_result(True)
+ 1/0
+
+ # Test call_soon (events.Handle)
+ with unittest.mock.patch('asyncio.base_events.logger') as log:
+ fut = asyncio.Future(loop=self.loop)
+ self.loop.call_soon(zero_error, fut)
+ fut.add_done_callback(lambda fut: self.loop.stop())
+ self.loop.run_forever()
+ log.error.assert_called_with(
+ test_utils.MockPattern('Exception in callback.*zero'),
+ exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY))
+
+ # Test call_later (events.TimerHandle)
+ with unittest.mock.patch('asyncio.base_events.logger') as log:
+ fut = asyncio.Future(loop=self.loop)
+ self.loop.call_later(0.01, zero_error, fut)
+ fut.add_done_callback(lambda fut: self.loop.stop())
+ self.loop.run_forever()
+ log.error.assert_called_with(
+ test_utils.MockPattern('Exception in callback.*zero'),
+ exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY))
+
+ def test_default_exc_handler_coro(self):
+ self.loop._process_events = unittest.mock.Mock()
+
+ @asyncio.coroutine
+ def zero_error_coro():
+ yield from asyncio.sleep(0.01, loop=self.loop)
+ 1/0
+
+ # Test Future.__del__
+ with unittest.mock.patch('asyncio.base_events.logger') as log:
+ fut = asyncio.async(zero_error_coro(), loop=self.loop)
+ fut.add_done_callback(lambda *args: self.loop.stop())
+ self.loop.run_forever()
+ fut = None # Trigger Future.__del__ or futures._TracebackLogger
+ if PY34:
+ # Future.__del__ in Python 3.4 logs error with
+ # an actual exception context
+ log.error.assert_called_with(
+ test_utils.MockPattern('.*exception was never retrieved'),
+ exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY))
+ else:
+ # futures._TracebackLogger logs only textual traceback
+ log.error.assert_called_with(
+ test_utils.MockPattern(
+ '.*exception was never retrieved.*ZeroDiv'),
+ exc_info=False)
+
+ def test_set_exc_handler_invalid(self):
+ with self.assertRaisesRegex(TypeError, 'A callable object or None'):
+ self.loop.set_exception_handler('spam')
+
+ def test_set_exc_handler_custom(self):
+ def zero_error():
+ 1/0
+
+ def run_loop():
+ self.loop.call_soon(zero_error)
+ self.loop._run_once()
+
+ self.loop._process_events = unittest.mock.Mock()
+
+ mock_handler = unittest.mock.Mock()
+ self.loop.set_exception_handler(mock_handler)
+ run_loop()
+ mock_handler.assert_called_with(self.loop, {
+ 'exception': MOCK_ANY,
+ 'message': test_utils.MockPattern(
+ 'Exception in callback.*zero_error'),
+ 'handle': MOCK_ANY,
+ })
+ mock_handler.reset_mock()
+
+ self.loop.set_exception_handler(None)
+ with unittest.mock.patch('asyncio.base_events.logger') as log:
+ run_loop()
+ log.error.assert_called_with(
+ test_utils.MockPattern(
+ 'Exception in callback.*zero'),
+ exc_info=(ZeroDivisionError, MOCK_ANY, MOCK_ANY))
+
+ assert not mock_handler.called
+
+ def test_set_exc_handler_broken(self):
+ def run_loop():
+ def zero_error():
+ 1/0
+ self.loop.call_soon(zero_error)
+ self.loop._run_once()
+
+ def handler(loop, context):
+ raise AttributeError('spam')
+
+ self.loop._process_events = unittest.mock.Mock()
+
+ self.loop.set_exception_handler(handler)
+
+ with unittest.mock.patch('asyncio.base_events.logger') as log:
+ run_loop()
+ log.error.assert_called_with(
+ test_utils.MockPattern(
+ 'Unhandled error in exception handler'),
+ exc_info=(AttributeError, MOCK_ANY, MOCK_ANY))
+
+ def test_default_exc_handler_broken(self):
+ _context = None
+
+ class Loop(base_events.BaseEventLoop):
+
+ _selector = unittest.mock.Mock()
+ _process_events = unittest.mock.Mock()
+
+ def default_exception_handler(self, context):
+ nonlocal _context
+ _context = context
+ # Simulates custom buggy "default_exception_handler"
+ raise ValueError('spam')
+
+ loop = Loop()
+ asyncio.set_event_loop(loop)
+
+ def run_loop():
+ def zero_error():
+ 1/0
+ loop.call_soon(zero_error)
+ loop._run_once()
+
+ with unittest.mock.patch('asyncio.base_events.logger') as log:
+ run_loop()
+ log.error.assert_called_with(
+ 'Exception in default exception handler',
+ exc_info=True)
+
+ def custom_handler(loop, context):
+ raise ValueError('ham')
+
+ _context = None
+ loop.set_exception_handler(custom_handler)
+ with unittest.mock.patch('asyncio.base_events.logger') as log:
+ run_loop()
+ log.error.assert_called_with(
+ test_utils.MockPattern('Exception in default exception.*'
+ 'while handling.*in custom'),
+ exc_info=True)
+
+ # Check that original context was passed to default
+ # exception handler.
+ self.assertIn('context', _context)
+ self.assertIs(type(_context['context']['exception']),
+ ZeroDivisionError)
+
class MyProto(asyncio.Protocol):
done = None
self.loop._accept_connection(MyProto, sock)
self.assertFalse(sock.close.called)
- @unittest.mock.patch('asyncio.selector_events.logger')
+ @unittest.mock.patch('asyncio.base_events.logger')
def test_accept_connection_exception(self, m_log):
sock = unittest.mock.Mock()
sock.fileno.return_value = 10
self.loop.call_later = unittest.mock.Mock()
self.loop._accept_connection(MyProto, sock)
- self.assertTrue(m_log.exception.called)
+ self.assertTrue(m_log.error.called)
self.assertFalse(sock.close.called)
self.loop.remove_reader.assert_called_with(10)
self.loop.call_later.assert_called_with(constants.ACCEPT_RETRY_DELAY,
return args
args = ()
- h = asyncio.Handle(callback, args)
+ h = asyncio.Handle(callback, args, unittest.mock.Mock())
self.assertIs(h._callback, callback)
self.assertIs(h._args, args)
self.assertFalse(h._cancelled)
'<function HandleTests.test_handle.<locals>.callback'))
self.assertTrue(r.endswith('())<cancelled>'), r)
- def test_handle(self):
+ def test_handle_from_handle(self):
def callback(*args):
return args
- h1 = asyncio.Handle(callback, ())
+ m_loop = object()
+ h1 = asyncio.Handle(callback, (), loop=m_loop)
self.assertRaises(
- AssertionError, asyncio.Handle, h1, ())
+ AssertionError, asyncio.Handle, h1, (), m_loop)
- @unittest.mock.patch('asyncio.events.logger')
- def test_callback_with_exception(self, log):
+ def test_callback_with_exception(self):
def callback():
raise ValueError()
- h = asyncio.Handle(callback, ())
+ m_loop = unittest.mock.Mock()
+ m_loop.call_exception_handler = unittest.mock.Mock()
+
+ h = asyncio.Handle(callback, (), m_loop)
h._run()
- self.assertTrue(log.exception.called)
+
+ m_loop.call_exception_handler.assert_called_with({
+ 'message': test_utils.MockPattern('Exception in callback.*'),
+ 'exception': unittest.mock.ANY,
+ 'handle': h
+ })
class TimerTests(unittest.TestCase):
def test_hash(self):
when = time.monotonic()
- h = asyncio.TimerHandle(when, lambda: False, ())
+ h = asyncio.TimerHandle(when, lambda: False, (),
+ unittest.mock.Mock())
self.assertEqual(hash(h), hash(when))
def test_timer(self):
args = ()
when = time.monotonic()
- h = asyncio.TimerHandle(when, callback, args)
+ h = asyncio.TimerHandle(when, callback, args, unittest.mock.Mock())
self.assertIs(h._callback, callback)
self.assertIs(h._args, args)
self.assertFalse(h._cancelled)
self.assertTrue(r.endswith('())<cancelled>'), r)
self.assertRaises(AssertionError,
- asyncio.TimerHandle, None, callback, args)
+ asyncio.TimerHandle, None, callback, args,
+ unittest.mock.Mock())
def test_timer_comparison(self):
+ loop = unittest.mock.Mock()
+
def callback(*args):
return args
when = time.monotonic()
- h1 = asyncio.TimerHandle(when, callback, ())
- h2 = asyncio.TimerHandle(when, callback, ())
+ h1 = asyncio.TimerHandle(when, callback, (), loop)
+ h2 = asyncio.TimerHandle(when, callback, (), loop)
# TODO: Use assertLess etc.
self.assertFalse(h1 < h2)
self.assertFalse(h2 < h1)
h2.cancel()
self.assertFalse(h1 == h2)
- h1 = asyncio.TimerHandle(when, callback, ())
- h2 = asyncio.TimerHandle(when + 10.0, callback, ())
+ h1 = asyncio.TimerHandle(when, callback, (), loop)
+ h2 = asyncio.TimerHandle(when + 10.0, callback, (), loop)
self.assertTrue(h1 < h2)
self.assertFalse(h2 < h1)
self.assertTrue(h1 <= h2)
self.assertFalse(h1 == h2)
self.assertTrue(h1 != h2)
- h3 = asyncio.Handle(callback, ())
+ h3 = asyncio.Handle(callback, (), loop)
self.assertIs(NotImplemented, h1.__eq__(h3))
self.assertIs(NotImplemented, h1.__ne__(h3))
self.assertRaises(AssertionError, test)
fut.cancel()
- @unittest.mock.patch('asyncio.futures.logger')
+ @unittest.mock.patch('asyncio.base_events.logger')
def test_tb_logger_abandoned(self, m_log):
fut = asyncio.Future(loop=self.loop)
del fut
self.assertFalse(m_log.error.called)
- @unittest.mock.patch('asyncio.futures.logger')
+ @unittest.mock.patch('asyncio.base_events.logger')
def test_tb_logger_result_unretrieved(self, m_log):
fut = asyncio.Future(loop=self.loop)
fut.set_result(42)
del fut
self.assertFalse(m_log.error.called)
- @unittest.mock.patch('asyncio.futures.logger')
+ @unittest.mock.patch('asyncio.base_events.logger')
def test_tb_logger_result_retrieved(self, m_log):
fut = asyncio.Future(loop=self.loop)
fut.set_result(42)
del fut
self.assertFalse(m_log.error.called)
- @unittest.mock.patch('asyncio.futures.logger')
+ @unittest.mock.patch('asyncio.base_events.logger')
def test_tb_logger_exception_unretrieved(self, m_log):
fut = asyncio.Future(loop=self.loop)
fut.set_exception(RuntimeError('boom'))
test_utils.run_briefly(self.loop)
self.assertTrue(m_log.error.called)
- @unittest.mock.patch('asyncio.futures.logger')
+ @unittest.mock.patch('asyncio.base_events.logger')
def test_tb_logger_exception_retrieved(self, m_log):
fut = asyncio.Future(loop=self.loop)
fut.set_exception(RuntimeError('boom'))
del fut
self.assertFalse(m_log.error.called)
- @unittest.mock.patch('asyncio.futures.logger')
+ @unittest.mock.patch('asyncio.base_events.logger')
def test_tb_logger_exception_result_retrieved(self, m_log):
fut = asyncio.Future(loop=self.loop)
fut.set_exception(RuntimeError('boom'))
test_utils.run_briefly(self.loop)
self.assertFalse(self.protocol.connection_lost.called)
- @unittest.mock.patch('asyncio.proactor_events.logger')
+ @unittest.mock.patch('asyncio.base_events.logger')
def test_fatal_error(self, m_logging):
tr = _ProactorSocketTransport(self.loop, self.sock, self.protocol)
tr._force_close = unittest.mock.Mock()
tr._fatal_error(None)
self.assertTrue(tr._force_close.called)
- self.assertTrue(m_logging.exception.called)
+ self.assertTrue(m_logging.error.called)
def test_force_close(self):
tr = _ProactorSocketTransport(self.loop, self.sock, self.protocol)
def test_process_events(self):
self.loop._process_events([])
- @unittest.mock.patch('asyncio.proactor_events.logger')
+ @unittest.mock.patch('asyncio.base_events.logger')
def test_create_server(self, m_log):
pf = unittest.mock.Mock()
call_soon = self.loop.call_soon = unittest.mock.Mock()
fut.result.side_effect = OSError()
loop(fut)
self.assertTrue(self.sock.close.called)
- self.assertTrue(m_log.exception.called)
+ self.assertTrue(m_log.error.called)
def test_create_server_cancel(self):
pf = unittest.mock.Mock()
from asyncio.selector_events import _SelectorDatagramTransport
+MOCK_ANY = unittest.mock.ANY
+
+
class TestBaseSelectorEventLoop(BaseSelectorEventLoop):
def _make_self_pipe(self):
self.assertFalse(self.loop.readers)
self.assertEqual(1, self.loop.remove_reader_count[7])
- @unittest.mock.patch('asyncio.log.logger.exception')
+ @unittest.mock.patch('asyncio.log.logger.error')
def test_fatal_error(self, m_exc):
exc = OSError()
tr = _SelectorTransport(self.loop, self.sock, self.protocol, None)
tr._force_close = unittest.mock.Mock()
tr._fatal_error(exc)
- m_exc.assert_called_with('Fatal error for %s', tr)
+ m_exc.assert_called_with(
+ test_utils.MockPattern(
+ 'Fatal transport error\nprotocol:.*\ntransport:.*'),
+ exc_info=(OSError, MOCK_ANY, MOCK_ANY))
+
tr._force_close.assert_called_with(exc)
def test_connection_lost(self):
transport._write_ready()
transport._fatal_error.assert_called_with(err)
- @unittest.mock.patch('asyncio.selector_events.logger')
+ @unittest.mock.patch('asyncio.base_events.logger')
def test_write_ready_exception_and_close(self, m_log):
self.sock.send.side_effect = OSError()
remove_writer = self.loop.remove_writer = unittest.mock.Mock()
self.assertFalse(transport._fatal_error.called)
self.assertTrue(self.protocol.error_received.called)
- @unittest.mock.patch('asyncio.log.logger.exception')
+ @unittest.mock.patch('asyncio.base_events.logger.error')
def test_fatal_error_connected(self, m_exc):
transport = _SelectorDatagramTransport(
self.loop, self.sock, self.protocol, ('0.0.0.0', 1))
err = ConnectionRefusedError()
transport._fatal_error(err)
self.assertFalse(self.protocol.error_received.called)
- m_exc.assert_called_with('Fatal error for %s', transport)
+ m_exc.assert_called_with(
+ test_utils.MockPattern(
+ 'Fatal transport error\nprotocol:.*\ntransport:.*'),
+ exc_info=(ConnectionRefusedError, MOCK_ANY, MOCK_ANY))
if __name__ == '__main__':
from asyncio import unix_events
+MOCK_ANY = unittest.mock.ANY
+
+
@unittest.skipUnless(signal, 'Signals are not supported')
class SelectorEventLoopSignalTests(unittest.TestCase):
self.loop._handle_signal(signal.NSIG + 1, ())
def test_handle_signal_cancelled_handler(self):
- h = asyncio.Handle(unittest.mock.Mock(), ())
+ h = asyncio.Handle(unittest.mock.Mock(), (),
+ loop=unittest.mock.Mock())
h.cancel()
self.loop._signal_handlers[signal.NSIG + 1] = h
self.loop.remove_signal_handler = unittest.mock.Mock()
signal.SIGINT, lambda: True)
@unittest.mock.patch('asyncio.unix_events.signal')
- @unittest.mock.patch('asyncio.unix_events.logger')
+ @unittest.mock.patch('asyncio.base_events.logger')
def test_add_signal_handler_install_error2(self, m_logging, m_signal):
m_signal.NSIG = signal.NSIG
self.assertEqual(1, m_signal.set_wakeup_fd.call_count)
@unittest.mock.patch('asyncio.unix_events.signal')
- @unittest.mock.patch('asyncio.unix_events.logger')
+ @unittest.mock.patch('asyncio.base_events.logger')
def test_add_signal_handler_install_error3(self, m_logging, m_signal):
class Err(OSError):
errno = errno.EINVAL
m_signal.signal.call_args[0])
@unittest.mock.patch('asyncio.unix_events.signal')
- @unittest.mock.patch('asyncio.unix_events.logger')
+ @unittest.mock.patch('asyncio.base_events.logger')
def test_remove_signal_handler_cleanup_error(self, m_logging, m_signal):
m_signal.NSIG = signal.NSIG
self.loop.add_signal_handler(signal.SIGHUP, lambda: True)
test_utils.run_briefly(self.loop)
self.assertFalse(self.protocol.data_received.called)
- @unittest.mock.patch('asyncio.log.logger.exception')
+ @unittest.mock.patch('asyncio.log.logger.error')
@unittest.mock.patch('os.read')
def test__read_ready_error(self, m_read, m_logexc):
tr = unix_events._UnixReadPipeTransport(
m_read.assert_called_with(5, tr.max_size)
tr._close.assert_called_with(err)
- m_logexc.assert_called_with('Fatal error for %s', tr)
+ m_logexc.assert_called_with(
+ test_utils.MockPattern(
+ 'Fatal transport error\nprotocol:.*\ntransport:.*'),
+ exc_info=(OSError, MOCK_ANY, MOCK_ANY))
@unittest.mock.patch('os.read')
def test_pause_reading(self, m_read):
self.assertEqual(2, sys.getrefcount(self.protocol),
pprint.pformat(gc.get_referrers(self.protocol)))
self.assertIsNone(tr._loop)
- self.assertEqual(2, sys.getrefcount(self.loop),
+ self.assertEqual(4, sys.getrefcount(self.loop),
pprint.pformat(gc.get_referrers(self.loop)))
def test__call_connection_lost_with_err(self):
self.pipe.close.assert_called_with()
self.assertIsNone(tr._protocol)
+
self.assertEqual(2, sys.getrefcount(self.protocol),
pprint.pformat(gc.get_referrers(self.protocol)))
self.assertIsNone(tr._loop)
- self.assertEqual(2, sys.getrefcount(self.loop),
+ self.assertEqual(4, sys.getrefcount(self.loop),
pprint.pformat(gc.get_referrers(self.loop)))
self.loop.assert_writer(5, tr._write_ready)
self.assertEqual([b'data'], tr._buffer)
- @unittest.mock.patch('asyncio.log.logger.exception')
+ @unittest.mock.patch('asyncio.log.logger.error')
@unittest.mock.patch('os.write')
def test__write_ready_err(self, m_write, m_logexc):
tr = unix_events._UnixWritePipeTransport(
self.assertFalse(self.loop.readers)
self.assertEqual([], tr._buffer)
self.assertTrue(tr._closing)
- m_logexc.assert_called_with('Fatal error for %s', tr)
+ m_logexc.assert_called_with(
+ test_utils.MockPattern(
+ 'Fatal transport error\nprotocol:.*\ntransport:.*'),
+ exc_info=(OSError, MOCK_ANY, MOCK_ANY))
self.assertEqual(1, tr._conn_lost)
test_utils.run_briefly(self.loop)
self.protocol.connection_lost.assert_called_with(err)
self.assertEqual(2, sys.getrefcount(self.protocol),
pprint.pformat(gc.get_referrers(self.protocol)))
self.assertIsNone(tr._loop)
- self.assertEqual(2, sys.getrefcount(self.loop),
+ self.assertEqual(4, sys.getrefcount(self.loop),
pprint.pformat(gc.get_referrers(self.loop)))
def test__call_connection_lost_with_err(self):
self.assertEqual(2, sys.getrefcount(self.protocol),
pprint.pformat(gc.get_referrers(self.protocol)))
self.assertIsNone(tr._loop)
- self.assertEqual(2, sys.getrefcount(self.loop),
+ self.assertEqual(4, sys.getrefcount(self.loop),
pprint.pformat(gc.get_referrers(self.loop)))
def test_close(self):
m.waitpid.side_effect = ValueError
with unittest.mock.patch.object(log.logger,
- "exception") as m_exception:
+ 'error') as m_error:
self.assertEqual(self.watcher._sig_chld(), None)
- self.assertTrue(m_exception.called)
+ self.assertTrue(m_error.called)
@waitpid_mocks
def test_sigchld_child_reaped_elsewhere(self, m):
New APIs: loop.create_unix_connection(), loop.create_unix_server(),
streams.open_unix_connection(), and streams.start_unix_server().
+- Issue #20681: Add new error handling API in asyncio. New APIs:
+ loop.set_exception_handler(), loop.default_exception_handler(), and
+ loop.call_exception_handler().
+
Tests
-----