def sock_accept(self, sock):
return self._proactor.accept(sock)
- def _socketpair(self):
- raise NotImplementedError
-
def _close_self_pipe(self):
if self._self_reading_future is not None:
self._self_reading_future.cancel()
def _make_self_pipe(self):
# A self-socket, really. :-)
- self._ssock, self._csock = self._socketpair()
+ self._ssock, self._csock = socket.socketpair()
self._ssock.setblocking(False)
self._csock.setblocking(False)
self._internal_fds += 1
self._selector.close()
self._selector = None
- def _socketpair(self):
- raise NotImplementedError
-
def _close_self_pipe(self):
self._remove_reader(self._ssock.fileno())
self._ssock.close()
def _make_self_pipe(self):
# A self-socket, really. :-)
- self._ssock, self._csock = self._socketpair()
+ self._ssock, self._csock = socket.socketpair()
self._ssock.setblocking(False)
self._csock.setblocking(False)
self._internal_fds += 1
super().__init__(selector)
self._signal_handlers = {}
- def _socketpair(self):
- return socket.socketpair()
-
def close(self):
super().close()
for sig in list(self._signal_handlers):
# socket (which we use in order to detect closing of the
# other end). Notably this is needed on AIX, and works
# just fine on other platforms.
- stdin, stdin_w = self._loop._socketpair()
+ stdin, stdin_w = socket.socketpair()
# Mark the write end of the stdin pipe as non-inheritable,
# needed by close_fds=False on Python 3.3 and older
class _WindowsSelectorEventLoop(selector_events.BaseSelectorEventLoop):
"""Windows version of selector event loop."""
- def _socketpair(self):
- return windows_utils.socketpair()
-
class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
"""Windows version of proactor event loop using IOCP."""
proactor = IocpProactor()
super().__init__(proactor)
- def _socketpair(self):
- return windows_utils.socketpair()
-
@coroutine
def create_pipe_connection(self, protocol_factory, address):
f = self._proactor.connect_pipe(address)
self.ssock, self.csock = mock.Mock(), mock.Mock()
- class EventLoop(BaseProactorEventLoop):
- def _socketpair(s):
- return (self.ssock, self.csock)
-
- self.loop = EventLoop(self.proactor)
+ with mock.patch('asyncio.proactor_events.socket.socketpair',
+ return_value=(self.ssock, self.csock)):
+ self.loop = BaseProactorEventLoop(self.proactor)
self.set_event_loop(self.loop)
@mock.patch.object(BaseProactorEventLoop, 'call_soon')
- @mock.patch.object(BaseProactorEventLoop, '_socketpair')
+ @mock.patch('asyncio.proactor_events.socket.socketpair')
def test_ctor(self, socketpair, call_soon):
ssock, csock = socketpair.return_value = (
mock.Mock(), mock.Mock())
self.loop.sock_accept(self.sock)
self.proactor.accept.assert_called_with(self.sock)
- def test_socketpair(self):
- class EventLoop(BaseProactorEventLoop):
- # override the destructor to not log a ResourceWarning
- def __del__(self):
- pass
- self.assertRaises(
- NotImplementedError, EventLoop, self.proactor)
-
def test_make_socket_transport(self):
tr = self.loop._make_socket_transport(self.sock, asyncio.Protocol())
self.assertIsInstance(tr, _ProactorSocketTransport)
self.loop.close()
self.assertIsNone(self.loop._selector)
- def test_socketpair(self):
- self.assertRaises(NotImplementedError, self.loop._socketpair)
-
def test_read_from_self_tryagain(self):
self.loop._ssock.recv.side_effect = BlockingIOError
self.assertIsNone(self.loop._read_from_self())
import os
+import socket
import sys
import unittest
from unittest import mock
self.set_event_loop(self.loop)
def test_close(self):
- a, b = self.loop._socketpair()
+ a, b = socket.socketpair()
trans = self.loop._make_socket_transport(a, asyncio.Protocol())
f = asyncio.ensure_future(self.loop.sock_recv(b, 100))
trans.close()