Patch by Коренберг Марк.
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+ # Check for abstract socket. `str` and `bytes` paths are supported.
+ if path[0] not in (0, '\x00'):
+ try:
+ if stat.S_ISSOCK(os.stat(path).st_mode):
+ os.remove(path)
+ except FileNotFoundError:
+ pass
+ except OSError as err:
+ # Directory may have permissions only to create socket.
+ logger.error('Unable to check or remove stale UNIX socket %r: %r', path, err)
+
try:
sock.bind(path)
except OSError as exc:
with test_utils.unix_socket_path() as path:
sock = socket.socket(socket.AF_UNIX)
sock.bind(path)
- with sock:
- coro = self.loop.create_unix_server(lambda: None, path)
- with self.assertRaisesRegex(OSError,
- 'Address.*is already in use'):
- self.loop.run_until_complete(coro)
+ sock.listen(1)
+ sock.close()
+
+ coro = self.loop.create_unix_server(lambda: None, path)
+ srv = self.loop.run_until_complete(coro)
+ srv.close()
+ self.loop.run_until_complete(srv.wait_closed())
def test_create_unix_server_existing_path_nonsock(self):
with tempfile.NamedTemporaryFile() as file:
- Issue #28372: Fix asyncio to support formatting of non-python coroutines.
+- Issue #28399: Remove UNIX socket from FS before binding.
+ Patch by Коренберг Марк.
+
IDLE
----