import os
import logging
+import time
import unittest
from unittest import mock
try:
raise NotImplementedError
def test_start_tls_client_1(self):
- HELLO_MSG = b'1' * 1024 * 1024 * 5
+ HELLO_MSG = b'1' * 1024 * 1024
server_context = test_utils.simple_server_sslcontext()
client_context = test_utils.simple_client_sslcontext()
def serve(sock):
+ sock.settimeout(5)
+
data = sock.recv_all(len(HELLO_MSG))
self.assertEqual(len(data), len(HELLO_MSG))
self.on_eof.set_result(True)
async def client(addr):
+ await asyncio.sleep(0.5, loop=self.loop)
+
on_data = self.loop.create_future()
on_eof = self.loop.create_future()
asyncio.wait_for(client(srv.addr), loop=self.loop, timeout=10))
def test_start_tls_server_1(self):
- HELLO_MSG = b'1' * 1024 * 1024 * 5
+ HELLO_MSG = b'1' * 1024 * 1024
server_context = test_utils.simple_server_sslcontext()
client_context = test_utils.simple_client_sslcontext()
def client(sock, addr):
+ time.sleep(0.5)
+ sock.settimeout(5)
+
sock.connect(addr)
data = sock.recv_all(len(HELLO_MSG))
self.assertEqual(len(data), len(HELLO_MSG))