"requires_IEEE_754", "skip_unless_xattr", "requires_zlib",
"anticipate_failure", "load_package_tests", "detect_api_mismatch",
"check__all__", "requires_android_level", "requires_multiprocessing_queue",
+ "skip_unless_bind_unix_socket",
# sys
"is_jython", "is_android", "check_impl_detail", "unix_shell",
"setswitchinterval",
msg = "no non-broken extended attribute support"
return test if ok else unittest.skip(msg)(test)
+_bind_nix_socket_error = None
+def skip_unless_bind_unix_socket(test):
+ """Decorator for tests requiring a functional bind() for unix sockets."""
+ if not hasattr(socket, 'AF_UNIX'):
+ return unittest.skip('No UNIX Sockets')(test)
+ global _bind_nix_socket_error
+ if _bind_nix_socket_error is None:
+ path = TESTFN + "can_bind_unix_socket"
+ with socket.socket(socket.AF_UNIX) as sock:
+ try:
+ sock.bind(path)
+ _bind_nix_socket_error = False
+ except OSError as e:
+ _bind_nix_socket_error = e
+ finally:
+ unlink(path)
+ if _bind_nix_socket_error:
+ msg = 'Requires a functional unix bind(): %s' % _bind_nix_socket_error
+ return unittest.skip(msg)(test)
+ else:
+ return test
+
def fs_is_case_insensitive(directory):
"""Detects if the file system for the specified directory is case-insensitive."""
import unittest
from unittest import mock
import weakref
+from test import support
if sys.platform != 'win32':
import tty
sock = socket.socket()
self._basetest_sock_recv_into(httpd, sock)
- @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
+ @support.skip_unless_bind_unix_socket
def test_unix_sock_client_ops(self):
with test_utils.run_test_unix_server() as httpd:
sock = socket.socket(socket.AF_UNIX)
lambda: MyProto(loop=self.loop), *httpd.address)
self._basetest_create_connection(conn_fut)
- @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
+ @support.skip_unless_bind_unix_socket
def test_create_unix_connection(self):
# Issue #20682: On Mac OS X Tiger, getsockname() returns a
# zero-length address for UNIX socket.
self._test_create_ssl_connection(httpd, create_connection,
peername=httpd.address)
+ @support.skip_unless_bind_unix_socket
@unittest.skipIf(ssl is None, 'No ssl module')
- @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
def test_create_ssl_unix_connection(self):
# Issue #20682: On Mac OS X Tiger, getsockname() returns a
# zero-length address for UNIX socket.
return server, path
- @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
+ @support.skip_unless_bind_unix_socket
def test_create_unix_server(self):
proto = MyProto(loop=self.loop)
server, path = self._make_unix_server(lambda: proto)
# stop serving
server.close()
+ @support.skip_unless_bind_unix_socket
@unittest.skipIf(ssl is None, 'No ssl module')
- @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
def test_create_unix_server_ssl(self):
proto = MyProto(loop=self.loop)
server, path = self._make_ssl_unix_server(
self.assertIsNone(proto.transport)
server.close()
+ @support.skip_unless_bind_unix_socket
@unittest.skipIf(ssl is None, 'No ssl module')
- @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
def test_create_unix_server_ssl_verify_failed(self):
proto = MyProto(loop=self.loop)
server, path = self._make_ssl_unix_server(
proto.transport.close()
server.close()
+ @support.skip_unless_bind_unix_socket
@unittest.skipIf(ssl is None, 'No ssl module')
- @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
def test_create_unix_server_ssl_verified(self):
proto = MyProto(loop=self.loop)
server, path = self._make_ssl_unix_server(
import threading
import unittest
from unittest import mock
+from test import support
try:
import ssl
except ImportError:
loop=self.loop)
self._basetest_open_connection(conn_fut)
- @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
+ @support.skip_unless_bind_unix_socket
def test_open_unix_connection(self):
with test_utils.run_test_unix_server() as httpd:
conn_fut = asyncio.open_unix_connection(httpd.address,
self._basetest_open_connection_no_loop_ssl(conn_fut)
+ @support.skip_unless_bind_unix_socket
@unittest.skipIf(ssl is None, 'No ssl module')
- @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
def test_open_unix_connection_no_loop_ssl(self):
with test_utils.run_test_unix_server(use_ssl=True) as httpd:
conn_fut = asyncio.open_unix_connection(
loop=self.loop)
self._basetest_open_connection_error(conn_fut)
- @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
+ @support.skip_unless_bind_unix_socket
def test_open_unix_connection_error(self):
with test_utils.run_test_unix_server() as httpd:
conn_fut = asyncio.open_unix_connection(httpd.address,
server.stop()
self.assertEqual(msg, b"hello world!\n")
- @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
+ @support.skip_unless_bind_unix_socket
def test_start_unix_server(self):
class MyServer:
import threading
import unittest
from unittest import mock
+from test import support
if sys.platform == 'win32':
raise unittest.SkipTest('UNIX only')
self.loop = asyncio.SelectorEventLoop()
self.set_event_loop(self.loop)
+ @support.skip_unless_bind_unix_socket
def test_create_unix_server_existing_path_sock(self):
with test_utils.unix_socket_path() as path:
sock = socket.socket(socket.AF_UNIX)
srv.close()
self.loop.run_until_complete(srv.wait_closed())
+ @support.skip_unless_bind_unix_socket
def test_create_unix_server_pathlib(self):
with test_utils.unix_socket_path() as path:
path = pathlib.Path(path)
@unittest.skipUnless(hasattr(socket, 'SOCK_NONBLOCK'),
'no socket.SOCK_NONBLOCK (linux only)')
+ @support.skip_unless_bind_unix_socket
def test_create_unix_server_path_stream_bittype(self):
sock = socket.socket(
socket.AF_UNIX, socket.SOCK_STREAM | socket.SOCK_NONBLOCK)
--- /dev/null
+The new test.support.skip_unless_bind_unix_socket() decorator is used here to
+skip asyncio tests that fail because the platform lacks a functional bind()
+function for unix domain sockets (as it is the case for non root users on the
+recent Android versions that run now SELinux in enforcing mode).
+