class MyBaseProto(asyncio.Protocol):
+ connected = None
done = None
def __init__(self, loop=None):
self.state = 'INITIAL'
self.nbytes = 0
if loop is not None:
+ self.connected = asyncio.Future(loop=loop)
self.done = asyncio.Future(loop=loop)
def connection_made(self, transport):
self.transport = transport
assert self.state == 'INITIAL', self.state
self.state = 'CONNECTED'
+ if self.connected:
+ self.connected.set_result(None)
def data_received(self, data):
assert self.state == 'CONNECTED', self.state
def test_reader_callback(self):
r, w = test_utils.socketpair()
- bytes_read = []
+ r.setblocking(False)
+ bytes_read = bytearray()
def reader():
try:
# at least on Linux -- see man select.
return
if data:
- bytes_read.append(data)
+ bytes_read.extend(data)
else:
self.assertTrue(self.loop.remove_reader(r.fileno()))
r.close()
self.loop.add_reader(r.fileno(), reader)
self.loop.call_soon(w.send, b'abc')
- test_utils.run_briefly(self.loop)
+ test_utils.run_until(self.loop, lambda: len(bytes_read) >= 3)
self.loop.call_soon(w.send, b'def')
- test_utils.run_briefly(self.loop)
+ test_utils.run_until(self.loop, lambda: len(bytes_read) >= 6)
self.loop.call_soon(w.close)
self.loop.call_soon(self.loop.stop)
self.loop.run_forever()
- self.assertEqual(b''.join(bytes_read), b'abcdef')
+ self.assertEqual(bytes_read, b'abcdef')
def test_writer_callback(self):
r, w = test_utils.socketpair()
w.setblocking(False)
- self.loop.add_writer(w.fileno(), w.send, b'x'*(256*1024))
- test_utils.run_briefly(self.loop)
- def remove_writer():
- self.assertTrue(self.loop.remove_writer(w.fileno()))
+ def writer(data):
+ w.send(data)
+ self.loop.stop()
- self.loop.call_soon(remove_writer)
- self.loop.call_soon(self.loop.stop)
+ data = b'x' * 1024
+ self.loop.add_writer(w.fileno(), writer, data)
self.loop.run_forever()
+
+ self.assertTrue(self.loop.remove_writer(w.fileno()))
+ self.assertFalse(self.loop.remove_writer(w.fileno()))
+
w.close()
- data = r.recv(256*1024)
+ read = r.recv(len(data) * 2)
r.close()
- self.assertGreaterEqual(len(data), 200)
+ self.assertEqual(read, data)
def _basetest_sock_client_ops(self, httpd, sock):
sock.setblocking(False)
self.assertFalse(self.loop.remove_signal_handler(signal.SIGKILL))
# Now set a handler and handle it.
self.loop.add_signal_handler(signal.SIGINT, my_handler)
- test_utils.run_briefly(self.loop)
+
os.kill(os.getpid(), signal.SIGINT)
- test_utils.run_briefly(self.loop)
- self.assertEqual(caught, 1)
+ test_utils.run_until(self.loop, lambda: caught)
+
# Removing it should restore the default handler.
self.assertTrue(self.loop.remove_signal_handler(signal.SIGINT))
self.assertEqual(signal.getsignal(signal.SIGINT),
self.assertIn(str(httpd.address), cm.exception.strerror)
def test_create_server(self):
- proto = MyProto()
+ proto = MyProto(self.loop)
f = self.loop.create_server(lambda: proto, '0.0.0.0', 0)
server = self.loop.run_until_complete(f)
self.assertEqual(len(server.sockets), 1)
client = socket.socket()
client.connect(('127.0.0.1', port))
client.sendall(b'xxx')
- test_utils.run_briefly(self.loop)
- test_utils.run_until(self.loop, lambda: proto is not None, 10)
- self.assertIsInstance(proto, MyProto)
- self.assertEqual('INITIAL', proto.state)
- test_utils.run_briefly(self.loop)
+
+ self.loop.run_until_complete(proto.connected)
self.assertEqual('CONNECTED', proto.state)
- test_utils.run_until(self.loop, lambda: proto.nbytes > 0,
- timeout=10)
+
+ test_utils.run_until(self.loop, lambda: proto.nbytes > 0)
self.assertEqual(3, proto.nbytes)
# extra info is available
# close connection
proto.transport.close()
- test_utils.run_briefly(self.loop) # windows iocp
+ self.loop.run_until_complete(proto.done)
self.assertEqual('CLOSED', proto.state)
@unittest.skipUnless(hasattr(socket, 'AF_UNIX'), 'No UNIX Sockets')
def test_create_unix_server(self):
- proto = MyProto()
+ proto = MyProto(loop=self.loop)
server, path = self._make_unix_server(lambda: proto)
self.assertEqual(len(server.sockets), 1)
client = socket.socket(socket.AF_UNIX)
client.connect(path)
client.sendall(b'xxx')
- test_utils.run_briefly(self.loop)
- test_utils.run_until(self.loop, lambda: proto is not None, 10)
- self.assertIsInstance(proto, MyProto)
- self.assertEqual('INITIAL', proto.state)
- test_utils.run_briefly(self.loop)
+ self.loop.run_until_complete(proto.connected)
self.assertEqual('CONNECTED', proto.state)
- test_utils.run_until(self.loop, lambda: proto.nbytes > 0,
- timeout=10)
+ test_utils.run_until(self.loop, lambda: proto.nbytes > 0)
self.assertEqual(3, proto.nbytes)
# close connection
proto.transport.close()
- test_utils.run_briefly(self.loop) # windows iocp
+ self.loop.run_until_complete(proto.done)
self.assertEqual('CLOSED', proto.state)
client, pr = self.loop.run_until_complete(f_c)
client.write(b'xxx')
- test_utils.run_briefly(self.loop)
- self.assertIsInstance(proto, MyProto)
- test_utils.run_briefly(self.loop)
+ self.loop.run_until_complete(proto.connected)
self.assertEqual('CONNECTED', proto.state)
- test_utils.run_until(self.loop, lambda: proto.nbytes > 0,
- timeout=10)
+
+ test_utils.run_until(self.loop, lambda: proto.nbytes > 0)
self.assertEqual(3, proto.nbytes)
# extra info is available
client, pr = self.loop.run_until_complete(f_c)
client.write(b'xxx')
- test_utils.run_briefly(self.loop)
- self.assertIsInstance(proto, MyProto)
- test_utils.run_briefly(self.loop)
+ self.loop.run_until_complete(proto.connected)
self.assertEqual('CONNECTED', proto.state)
- test_utils.run_until(self.loop, lambda: proto.nbytes > 0,
- timeout=10)
+ test_utils.run_until(self.loop, lambda: proto.nbytes > 0)
self.assertEqual(3, proto.nbytes)
# close connection
self.assertEqual('INITIALIZED', client.state)
transport.sendto(b'xxx')
- for _ in range(1000):
- if server.nbytes:
- break
- test_utils.run_briefly(self.loop)
+ test_utils.run_until(self.loop, lambda: server.nbytes)
self.assertEqual(3, server.nbytes)
- for _ in range(1000):
- if client.nbytes:
- break
- test_utils.run_briefly(self.loop)
+ test_utils.run_until(self.loop, lambda: client.nbytes)
# received
self.assertEqual(8, client.nbytes)
self.loop.run_until_complete(connect())
os.write(wpipe, b'1')
- test_utils.run_briefly(self.loop)
+ test_utils.run_until(self.loop, lambda: proto.nbytes >= 1)
self.assertEqual(1, proto.nbytes)
os.write(wpipe, b'2345')
- test_utils.run_briefly(self.loop)
+ test_utils.run_until(self.loop, lambda: proto.nbytes >= 5)
self.assertEqual(['INITIAL', 'CONNECTED'], proto.state)
self.assertEqual(5, proto.nbytes)
self.assertEqual('CONNECTED', proto.state)
transport.write(b'1')
- test_utils.run_briefly(self.loop)
- data = os.read(rpipe, 1024)
+
+ data = bytearray()
+ def reader(data):
+ chunk = os.read(rpipe, 1024)
+ data += chunk
+ return len(data)
+
+ test_utils.run_until(self.loop, lambda: reader(data) >= 1)
self.assertEqual(b'1', data)
transport.write(b'2345')
- test_utils.run_briefly(self.loop)
- data = os.read(rpipe, 1024)
- self.assertEqual(b'2345', data)
+ test_utils.run_until(self.loop, lambda: reader(data) >= 5)
+ self.assertEqual(b'12345', data)
self.assertEqual('CONNECTED', proto.state)
os.close(rpipe)
self.assertEqual('CONNECTED', proto.state)
transport.write(b'1')
- test_utils.run_briefly(self.loop)
- data = os.read(master, 1024)
+
+ data = bytearray()
+ def reader(data):
+ chunk = os.read(master, 1024)
+ data += chunk
+ return len(data)
+
+ test_utils.run_until(self.loop, lambda: reader(data) >= 1,
+ timeout=10)
self.assertEqual(b'1', data)
transport.write(b'2345')
- test_utils.run_briefly(self.loop)
- data = os.read(master, 1024)
- self.assertEqual(b'2345', data)
+ test_utils.run_until(self.loop, lambda: reader(data) >= 5,
+ timeout=10)
+ self.assertEqual(b'12345', data)
self.assertEqual('CONNECTED', proto.state)
os.close(master)