# client each send
chunk_size = 1
+ def __init__(self, event):
+ threading.Thread.__init__(self)
+ self.event = event
+
def run(self):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
global PORT
PORT = test_support.bind_port(sock, HOST, PORT)
sock.listen(1)
+ self.event.set()
conn, client = sock.accept()
self.buffer = ""
# collect data until quit message is seen
self.buffer = ""
+def start_echo_server():
+ event = threading.Event()
+ s = echo_server(event)
+ s.start()
+ event.wait()
+ event.clear()
+ time.sleep(0.01) # Give server time to start accepting.
+ return s, event
+
+
class TestAsynchat(unittest.TestCase):
usepoll = False
pass
def line_terminator_check(self, term, server_chunk):
- s = echo_server()
+ event = threading.Event()
+ s = echo_server(event)
s.chunk_size = server_chunk
s.start()
- time.sleep(0.5) # Give server time to initialize
+ event.wait()
+ event.clear()
+ time.sleep(0.01) # Give server time to start accepting.
c = echo_client(term)
c.push("hello ")
c.push("world%s" % term)
def numeric_terminator_check(self, termlen):
# Try reading a fixed number of bytes
- s = echo_server()
- s.start()
- time.sleep(0.5) # Give server time to initialize
+ s, event = start_echo_server()
c = echo_client(termlen)
data = "hello world, I'm not dead yet!\n"
c.push(data)
def test_none_terminator(self):
# Try reading a fixed number of bytes
- s = echo_server()
- s.start()
- time.sleep(0.5) # Give server time to initialize
+ s, event = start_echo_server()
c = echo_client(None)
data = "hello world, I'm not dead yet!\n"
c.push(data)
self.assertEqual(c.buffer, data)
def test_simple_producer(self):
- s = echo_server()
- s.start()
- time.sleep(0.5) # Give server time to initialize
+ s, event = start_echo_server()
c = echo_client('\n')
data = "hello world\nI'm not dead yet!\n"
p = asynchat.simple_producer(data+SERVER_QUIT, buffer_size=8)
self.assertEqual(c.contents, ["hello world", "I'm not dead yet!"])
def test_string_producer(self):
- s = echo_server()
- s.start()
- time.sleep(0.5) # Give server time to initialize
+ s, event = start_echo_server()
c = echo_client('\n')
data = "hello world\nI'm not dead yet!\n"
c.push_with_producer(data+SERVER_QUIT)
def test_empty_line(self):
# checks that empty lines are handled correctly
- s = echo_server()
- s.start()
- time.sleep(0.5) # Give server time to initialize
+ s, event = start_echo_server()
c = echo_client('\n')
c.push("hello world\n\nI'm not dead yet!\n")
c.push(SERVER_QUIT)
self.assertEqual(c.contents, ["hello world", "", "I'm not dead yet!"])
def test_close_when_done(self):
- s = echo_server()
- s.start()
- time.sleep(0.5) # Give server time to initialize
+ s, event = start_echo_server()
c = echo_client('\n')
c.push("hello world\nI'm not dead yet!\n")
c.push(SERVER_QUIT)