import asyncio
import socket
+import time
import threading
import unittest
+from test import support
from test.test_asyncio import utils as test_utils
from test.test_asyncio import functional as func_tests
HELLO_MSG = b'1' * 1024 * 5 + b'\n'
def client(sock, addr):
+ for i in range(10):
+ time.sleep(0.2)
+ if srv.is_serving():
+ break
+ else:
+ raise RuntimeError
+
+ sock.settimeout(2)
sock.connect(addr)
sock.send(HELLO_MSG)
sock.recv_all(1)
await srv.serve_forever()
srv = self.loop.run_until_complete(asyncio.start_server(
- serve, '127.0.0.1', 0, loop=self.loop, start_serving=False))
+ serve, support.HOSTv4, 0, loop=self.loop, start_serving=False))
self.assertFalse(srv.is_serving())
started = threading.Event()
def client(sock, addr):
+ sock.settimeout(2)
started.wait(5)
sock.connect(addr)
sock.send(HELLO_MSG)