transport, protocol = yield from self._create_connection_transport(
sock, protocol_factory, ssl, server_hostname)
if self._debug:
+ # Get the socket from the transport because SSL transport closes
+ # the old socket and creates a new SSL socket
+ sock = transport.get_extra_info('socket')
logger.debug("%r connected to %s:%r: (%r, %r)",
sock, host, port, transport, protocol)
return transport, protocol
sock = socket.socket(af, socktype, proto)
except socket.error:
# Assume it's a bad family/type/protocol combination.
+ if self._debug:
+ logger.warning('create_server() failed to create '
+ 'socket.socket(%r, %r, %r)',
+ af, socktype, proto, exc_info=True)
continue
sockets.append(sock)
if reuse_address:
except ConnectionAbortedError as exc:
if not self._closing:
self._fatal_error(exc, 'Fatal read error on pipe transport')
+ elif self._loop.get_debug():
+ logger.debug("Read error on pipe transport while closing",
+ exc_info=True)
except ConnectionResetError as exc:
self._force_close(exc)
except OSError as exc:
try:
self._extra['sockname'] = sock.getsockname()
except (socket.error, AttributeError):
- pass
+ if self._loop.get_debug():
+ logger.warning("getsockname() failed on %r",
+ sock, exc_info=True)
if 'peername' not in self._extra:
try:
self._extra['peername'] = sock.getpeername()
except (socket.error, AttributeError):
- pass
+ if self._loop.get_debug():
+ logger.warning("getpeername() failed on %r",
+ sock, exc_info=True)
def can_write_eof(self):
return True
self._selector = None
def sock_recv(self, sock, n):
- if self.get_debug() and sock.gettimeout() != 0:
- raise ValueError("the socket must be non-blocking")
return self._proactor.recv(sock, n)
def sock_sendall(self, sock, data):
- if self.get_debug() and sock.gettimeout() != 0:
- raise ValueError("the socket must be non-blocking")
return self._proactor.send(sock, data)
def sock_connect(self, sock, address):
- if self.get_debug() and sock.gettimeout() != 0:
- raise ValueError("the socket must be non-blocking")
try:
base_events._check_resolved_address(sock, address)
except ValueError as err:
return self._proactor.connect(sock, address)
def sock_accept(self, sock):
- if self.get_debug() and sock.gettimeout() != 0:
- raise ValueError("the socket must be non-blocking")
return self._proactor.accept(sock)
def _socketpair(self):
except OSError as exc:
if sock.fileno() != -1:
self.call_exception_handler({
- 'message': 'Accept failed',
+ 'message': 'Accept failed on a socket',
'exception': exc,
'socket': sock,
})
sock.close()
+ elif self._debug:
+ logger.debug("Accept failed on socket %r",
+ sock, exc_info=True)
except futures.CancelledError:
sock.close()
else:
def __repr__(self):
info = [self.__class__.__name__, 'fd=%s' % self._sock_fd]
- polling = _test_selector_event(self._loop._selector,
- self._sock_fd, selectors.EVENT_READ)
- if polling:
- info.append('read=polling')
- else:
- info.append('read=idle')
+ # test if the transport was closed
+ if self._loop is not None:
+ polling = _test_selector_event(self._loop._selector,
+ self._sock_fd, selectors.EVENT_READ)
+ if polling:
+ info.append('read=polling')
+ else:
+ info.append('read=idle')
- polling = _test_selector_event(self._loop._selector,
- self._sock_fd, selectors.EVENT_WRITE)
- if polling:
- state = 'polling'
- else:
- state = 'idle'
+ polling = _test_selector_event(self._loop._selector,
+ self._sock_fd, selectors.EVENT_WRITE)
+ if polling:
+ state = 'polling'
+ else:
+ state = 'idle'
- bufsize = self.get_write_buffer_size()
- info.append('write=<%s, bufsize=%s>' % (state, bufsize))
+ bufsize = self.get_write_buffer_size()
+ info.append('write=<%s, bufsize=%s>' % (state, bufsize))
return '<%s>' % ' '.join(info)
def abort(self):
self._server_hostname = server_hostname
self._waiter = waiter
- self._rawsock = rawsock
self._sslcontext = sslcontext
self._paused = False
yield
finally:
logger.setLevel(old_level)
+
+def mock_nonblocking_socket():
+ """Create a mock of a non-blocking socket."""
+ sock = mock.Mock(socket.socket)
+ sock.gettimeout.return_value = 0.0
+ return sock
def _fatal_error(self, exc, message='Fatal error on pipe transport'):
# should be called by exception handler only
- if not (isinstance(exc, OSError) and exc.errno == errno.EIO):
+ if (isinstance(exc, OSError) and exc.errno == errno.EIO):
+ if self._loop.get_debug():
+ logger.debug("%r: %s", self, message, exc_info=True)
+ else:
self._loop.call_exception_handler({
'message': message,
'exception': exc,
def _fatal_error(self, exc, message='Fatal error on pipe transport'):
# should be called by exception handler only
- if not isinstance(exc, (BrokenPipeError, ConnectionResetError)):
+ if isinstance(exc, (BrokenPipeError, ConnectionResetError)):
+ if self._loop.get_debug():
+ logger.debug("%r: %s", self, message, exc_info=True)
+ else:
self._loop.call_exception_handler({
'message': message,
'exception': exc,
except KeyError: # pragma: no cover
# May happen if .remove_child_handler() is called
# after os.waitpid() returns.
- pass
+ if self._loop.get_debug():
+ logger.warning("Child watcher got an unexpected pid: %r",
+ pid, exc_info=True)
else:
callback(pid, returncode, *args)
return
try:
_overlapped.UnregisterWait(self._wait_handle)
- except OSError as e:
- if e.winerror != _overlapped.ERROR_IO_PENDING:
- raise
+ except OSError as exc:
# ERROR_IO_PENDING is not an error, the wait was unregistered
+ if exc.winerror != _overlapped.ERROR_IO_PENDING:
+ context = {
+ 'message': 'Failed to unregister the wait handle',
+ 'exception': exc,
+ 'future': self,
+ }
+ if self._source_traceback:
+ context['source_traceback'] = self._source_traceback
+ self._loop.call_exception_handler(context)
self._wait_handle = None
self._iocp = None
self._ov = None
def __init__(self, address):
self._address = address
self._free_instances = weakref.WeakSet()
+ # initialize the pipe attribute before calling _server_pipe_handle()
+ # because this function can raise an exception and the destructor calls
+ # the close() method
+ self._pipe = None
+ self._accept_pipe_future = None
self._pipe = self._server_pipe_handle(True)
def _get_unconnected_pipe(self):
return pipe
def close(self):
+ if self._accept_pipe_future is not None:
+ self._accept_pipe_future.cancel()
+ self._accept_pipe_future = None
# Close all instances which have not been connected to by a client.
if self._address is not None:
for pipe in self._free_instances:
def start_serving_pipe(self, protocol_factory, address):
server = PipeServer(address)
- def loop(f=None):
+ def loop_accept_pipe(f=None):
pipe = None
try:
if f:
'pipe': pipe,
})
pipe.close()
+ elif self._debug:
+ logger.warning("Accept pipe failed on pipe %r",
+ pipe, exc_info=True)
except futures.CancelledError:
if pipe:
pipe.close()
else:
- f.add_done_callback(loop)
+ server._accept_pipe_future = f
+ f.add_done_callback(loop_accept_pipe)
- self.call_soon(loop)
+ self.call_soon(loop_accept_pipe)
return [server]
@coroutine
class _SelectorTransportMock:
_sock = None
+ def get_extra_info(self, key):
+ return mock.Mock()
+
def close(self):
self._sock.close()
import asyncio
+from asyncio import proactor_events
from asyncio import selector_events
from asyncio import test_utils
self.assertEqual(read, data)
def _basetest_sock_client_ops(self, httpd, sock):
- # in debug mode, socket operations must fail
- # if the socket is not in blocking mode
- self.loop.set_debug(True)
- sock.setblocking(True)
- with self.assertRaises(ValueError):
- self.loop.run_until_complete(
- self.loop.sock_connect(sock, httpd.address))
- with self.assertRaises(ValueError):
- self.loop.run_until_complete(
- self.loop.sock_sendall(sock, b'GET / HTTP/1.0\r\n\r\n'))
- with self.assertRaises(ValueError):
- self.loop.run_until_complete(
- self.loop.sock_recv(sock, 1024))
- with self.assertRaises(ValueError):
- self.loop.run_until_complete(
- self.loop.sock_accept(sock))
+ if not isinstance(self.loop, proactor_events.BaseProactorEventLoop):
+ # in debug mode, socket operations must fail
+ # if the socket is not in blocking mode
+ self.loop.set_debug(True)
+ sock.setblocking(True)
+ with self.assertRaises(ValueError):
+ self.loop.run_until_complete(
+ self.loop.sock_connect(sock, httpd.address))
+ with self.assertRaises(ValueError):
+ self.loop.run_until_complete(
+ self.loop.sock_sendall(sock, b'GET / HTTP/1.0\r\n\r\n'))
+ with self.assertRaises(ValueError):
+ self.loop.run_until_complete(
+ self.loop.sock_recv(sock, 1024))
+ with self.assertRaises(ValueError):
+ self.loop.run_until_complete(
+ self.loop.sock_accept(sock))
# test in non-blocking mode
sock.setblocking(False)
"Don't support pipes for Windows")
def test_write_pipe_disconnect_on_close(self):
rsock, wsock = test_utils.socketpair()
+ rsock.setblocking(False)
pipeobj = io.open(wsock.detach(), 'wb', 1024)
proto = MyWritePipeProto(loop=self.loop)
for sock_type in (socket.SOCK_STREAM, socket.SOCK_DGRAM):
sock = socket.socket(family, sock_type)
with sock:
+ sock.setblocking(False)
connect = self.loop.sock_connect(sock, address)
with self.assertRaises(ValueError) as cm:
self.loop.run_until_complete(connect)
self.loop.remove_reader = mock.Mock()
self.loop.remove_writer = mock.Mock()
waiter = asyncio.Future(loop=self.loop)
- transport = self.loop._make_ssl_transport(
- m, asyncio.Protocol(), m, waiter)
+ with test_utils.disable_logger():
+ transport = self.loop._make_ssl_transport(
+ m, asyncio.Protocol(), m, waiter)
self.assertIsInstance(transport, _SelectorSslTransport)
@mock.patch('asyncio.selector_events.ssl', None)
def test_write_to_self_tryagain(self):
self.loop._csock.send.side_effect = BlockingIOError
- self.assertIsNone(self.loop._write_to_self())
+ with test_utils.disable_logger():
+ self.assertIsNone(self.loop._write_to_self())
def test_write_to_self_exception(self):
# _write_to_self() swallows OSError
self.assertRaises(RuntimeError, self.loop._write_to_self)
def test_sock_recv(self):
- sock = mock.Mock()
+ sock = test_utils.mock_nonblocking_socket()
self.loop._sock_recv = mock.Mock()
f = self.loop.sock_recv(sock, 1024)
self.assertIs(err, f.exception())
def test_sock_sendall(self):
- sock = mock.Mock()
+ sock = test_utils.mock_nonblocking_socket()
self.loop._sock_sendall = mock.Mock()
f = self.loop.sock_sendall(sock, b'data')
self.loop._sock_sendall.call_args[0])
def test_sock_sendall_nodata(self):
- sock = mock.Mock()
+ sock = test_utils.mock_nonblocking_socket()
self.loop._sock_sendall = mock.Mock()
f = self.loop.sock_sendall(sock, b'')
self.loop.add_writer.call_args[0])
def test_sock_connect(self):
- sock = mock.Mock()
+ sock = test_utils.mock_nonblocking_socket()
self.loop._sock_connect = mock.Mock()
f = self.loop.sock_connect(sock, ('127.0.0.1', 8080))
self.assertIsInstance(f.exception(), OSError)
def test_sock_accept(self):
- sock = mock.Mock()
+ sock = test_utils.mock_nonblocking_socket()
self.loop._sock_accept = mock.Mock()
f = self.loop.sock_accept(sock)
transport = _SelectorSocketTransport(
self.loop, self.sock, self.protocol)
transport._force_close = mock.Mock()
- transport._read_ready()
+ with test_utils.disable_logger():
+ transport._read_ready()
transport._force_close.assert_called_with(err)
@mock.patch('logging.exception')
err = self.sslsock.recv.side_effect = ConnectionResetError()
transport = self._make_one()
transport._force_close = mock.Mock()
- transport._read_ready()
+ with test_utils.disable_logger():
+ transport._read_ready()
transport._force_close.assert_called_with(err)
def test_read_ready_recv_retry(self):
coro = write_stdin(proc, large_data)
# drain() must raise BrokenPipeError or ConnectionResetError
- self.assertRaises((BrokenPipeError, ConnectionResetError),
- self.loop.run_until_complete, coro)
+ with test_utils.disable_logger():
+ self.assertRaises((BrokenPipeError, ConnectionResetError),
+ self.loop.run_until_complete, coro)
self.loop.run_until_complete(proc.wait())
def test_communicate_ignore_broken_pipe(self):
proc, large_data = self.prepare_broken_pipe_test()
# communicate() must ignore BrokenPipeError when feeding stdin
- self.loop.run_until_complete(proc.communicate(large_data))
+ with test_utils.disable_logger():
+ self.loop.run_until_complete(proc.communicate(large_data))
self.loop.run_until_complete(proc.wait())