:meth:`AbstractEventLoop.add_reader` method and then close the event loop::
import asyncio
- try:
- from socket import socketpair
- except ImportError:
- from asyncio.windows_utils import socketpair
+ from socket import socketpair
# Create a pair of connected file descriptors
rsock, wsock = socketpair()
the event loop ::
import asyncio
- try:
- from socket import socketpair
- except ImportError:
- from asyncio.windows_utils import socketpair
+ from socket import socketpair
# Create a pair of connected sockets
rsock, wsock = socketpair()
:func:`open_connection` function::
import asyncio
- try:
- from socket import socketpair
- except ImportError:
- from asyncio.windows_utils import socketpair
+ from socket import socketpair
@asyncio.coroutine
def wait_for_data(loop):
Changes in the Python API
-------------------------
+* The ``asyncio.windows_utils.socketpair()`` function has been
+ removed: use directly :func:`socket.socketpair` which is available on all
+ platforms since Python 3.5 (before, it wasn't available on Windows).
+ ``asyncio.windows_utils.socketpair()`` was just an alias to
+ ``socket.socketpair`` on Python 3.5 and newer.
+
* :mod:`asyncio`: The module doesn't export :mod:`selectors` and
:mod:`_overlapped` modules as ``asyncio.selectors`` and
``asyncio._overlapped``. Replace ``from asyncio import selectors`` with
from test import support
-if sys.platform == 'win32': # pragma: no cover
- from .windows_utils import socketpair
-else:
- from socket import socketpair # pragma: no cover
-
-
def dummy_ssl_context():
if ssl is None:
return None
import warnings
-__all__ = ['socketpair', 'pipe', 'Popen', 'PIPE', 'PipeHandle']
+__all__ = ['pipe', 'Popen', 'PIPE', 'PipeHandle']
# Constants/globals
_mmap_counter = itertools.count()
-if hasattr(socket, 'socketpair'):
- # Since Python 3.5, socket.socketpair() is now also available on Windows
- socketpair = socket.socketpair
-else:
- # Replacement for socket.socketpair()
- def socketpair(family=socket.AF_INET, type=socket.SOCK_STREAM, proto=0):
- """A socket pair usable as a self-pipe, for Windows.
-
- Origin: https://gist.github.com/4325783, by Geert Jansen.
- Public domain.
- """
- if family == socket.AF_INET:
- host = '127.0.0.1'
- elif family == socket.AF_INET6:
- host = '::1'
- else:
- raise ValueError("Only AF_INET and AF_INET6 socket address "
- "families are supported")
- if type != socket.SOCK_STREAM:
- raise ValueError("Only SOCK_STREAM socket type is supported")
- if proto != 0:
- raise ValueError("Only protocol zero is supported")
-
- # We create a connected TCP socket. Note the trick with setblocking(0)
- # that prevents us from having to create a thread.
- lsock = socket.socket(family, type, proto)
- try:
- lsock.bind((host, 0))
- lsock.listen(1)
- # On IPv6, ignore flow_info and scope_id
- addr, port = lsock.getsockname()[:2]
- csock = socket.socket(family, type, proto)
- try:
- csock.setblocking(False)
- try:
- csock.connect((addr, port))
- except (BlockingIOError, InterruptedError):
- pass
- csock.setblocking(True)
- ssock, _ = lsock.accept()
- except:
- csock.close()
- raise
- finally:
- lsock.close()
- return (ssock, csock)
-
-
# Replacement for os.pipe() using handles instead of fds
self.assertNotEqual(thread_id, threading.get_ident())
def test_reader_callback(self):
- r, w = test_utils.socketpair()
+ r, w = socket.socketpair()
r.setblocking(False)
bytes_read = bytearray()
self.assertEqual(bytes_read, b'abcdef')
def test_writer_callback(self):
- r, w = test_utils.socketpair()
+ r, w = socket.socketpair()
w.setblocking(False)
def writer(data):
@unittest.skipUnless(sys.platform != 'win32',
"Don't support pipes for Windows")
def test_write_pipe_disconnect_on_close(self):
- rsock, wsock = test_utils.socketpair()
+ rsock, wsock = socket.socketpair()
rsock.setblocking(False)
pipeobj = io.open(wsock.detach(), 'wb', 1024)
self.assertEqual('CLOSED', write_proto.state)
def test_prompt_cancellation(self):
- r, w = test_utils.socketpair()
+ r, w = socket.socketpair()
r.setblocking(False)
f = self.loop.sock_recv(r, 1)
ov = getattr(f, 'ov', None)
def test_remove_fds_after_closing(self):
loop = self.create_event_loop()
callback = lambda: None
- r, w = test_utils.socketpair()
+ r, w = socket.socketpair()
self.addCleanup(r.close)
self.addCleanup(w.close)
loop.add_reader(r, callback)
def test_add_fds_after_closing(self):
loop = self.create_event_loop()
callback = lambda: None
- r, w = test_utils.socketpair()
+ r, w = socket.socketpair()
self.addCleanup(r.close)
self.addCleanup(w.close)
loop.close()
from asyncio import test_support as support
-class WinsocketpairTests(unittest.TestCase):
-
- def check_winsocketpair(self, ssock, csock):
- csock.send(b'xxx')
- self.assertEqual(b'xxx', ssock.recv(1024))
- csock.close()
- ssock.close()
-
- def test_winsocketpair(self):
- ssock, csock = windows_utils.socketpair()
- self.check_winsocketpair(ssock, csock)
-
- @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 not supported or enabled')
- def test_winsocketpair_ipv6(self):
- ssock, csock = windows_utils.socketpair(family=socket.AF_INET6)
- self.check_winsocketpair(ssock, csock)
-
- @unittest.skipIf(hasattr(socket, 'socketpair'),
- 'socket.socketpair is available')
- @mock.patch('asyncio.windows_utils.socket')
- def test_winsocketpair_exc(self, m_socket):
- m_socket.AF_INET = socket.AF_INET
- m_socket.SOCK_STREAM = socket.SOCK_STREAM
- m_socket.socket.return_value.getsockname.return_value = ('', 12345)
- m_socket.socket.return_value.accept.return_value = object(), object()
- m_socket.socket.return_value.connect.side_effect = OSError()
-
- self.assertRaises(OSError, windows_utils.socketpair)
-
- def test_winsocketpair_invalid_args(self):
- self.assertRaises(ValueError,
- windows_utils.socketpair, family=socket.AF_UNSPEC)
- self.assertRaises(ValueError,
- windows_utils.socketpair, type=socket.SOCK_DGRAM)
- self.assertRaises(ValueError,
- windows_utils.socketpair, proto=1)
-
- @unittest.skipIf(hasattr(socket, 'socketpair'),
- 'socket.socketpair is available')
- @mock.patch('asyncio.windows_utils.socket')
- def test_winsocketpair_close(self, m_socket):
- m_socket.AF_INET = socket.AF_INET
- m_socket.SOCK_STREAM = socket.SOCK_STREAM
- sock = mock.Mock()
- m_socket.socket.return_value = sock
- sock.bind.side_effect = OSError
- self.assertRaises(OSError, windows_utils.socketpair)
- self.assertTrue(sock.close.called)
-
-
class PipeTests(unittest.TestCase):
def test_pipe_overlapped(self):
--- /dev/null
+The ``asyncio.windows_utils.socketpair()`` function has been removed: use
+directly :func:`socket.socketpair` which is available on all platforms since
+Python 3.5 (before, it wasn't available on Windows).
+``asyncio.windows_utils.socketpair()`` was just an alias to
+``socket.socketpair`` on Python 3.5 and newer.