import sys
from test import test_support
-HOST = "127.0.0.1"
-PORT = 54322
+HOST = test_support.HOST
SERVER_QUIT = 'QUIT\n'
class echo_server(threading.Thread):
def __init__(self, event):
threading.Thread.__init__(self)
self.event = event
+ self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.port = test_support.bind_port(self.sock)
def run(self):
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- global PORT
- PORT = test_support.bind_port(sock, HOST, PORT)
- sock.listen(1)
+ self.sock.listen(1)
self.event.set()
- conn, client = sock.accept()
+ conn, client = self.sock.accept()
self.buffer = ""
# collect data until quit message is seen
while SERVER_QUIT not in self.buffer:
pass
conn.close()
- sock.close()
+ self.sock.close()
class echo_client(asynchat.async_chat):
- def __init__(self, terminator):
+ def __init__(self, terminator, server_port):
asynchat.async_chat.__init__(self)
self.contents = []
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
- self.connect((HOST, PORT))
+ self.connect((HOST, server_port))
self.set_terminator(terminator)
self.buffer = ''
event.wait()
event.clear()
time.sleep(0.01) # Give server time to start accepting.
- c = echo_client(term)
+ c = echo_client(term, s.port)
c.push("hello ")
c.push("world%s" % term)
c.push("I'm not dead yet!%s" % term)
def numeric_terminator_check(self, termlen):
# Try reading a fixed number of bytes
s, event = start_echo_server()
- c = echo_client(termlen)
+ c = echo_client(termlen, s.port)
data = "hello world, I'm not dead yet!\n"
c.push(data)
c.push(SERVER_QUIT)
def test_none_terminator(self):
# Try reading a fixed number of bytes
s, event = start_echo_server()
- c = echo_client(None)
+ c = echo_client(None, s.port)
data = "hello world, I'm not dead yet!\n"
c.push(data)
c.push(SERVER_QUIT)
def test_simple_producer(self):
s, event = start_echo_server()
- c = echo_client('\n')
+ c = echo_client('\n', s.port)
data = "hello world\nI'm not dead yet!\n"
p = asynchat.simple_producer(data+SERVER_QUIT, buffer_size=8)
c.push_with_producer(p)
def test_string_producer(self):
s, event = start_echo_server()
- c = echo_client('\n')
+ c = echo_client('\n', s.port)
data = "hello world\nI'm not dead yet!\n"
c.push_with_producer(data+SERVER_QUIT)
asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
def test_empty_line(self):
# checks that empty lines are handled correctly
s, event = start_echo_server()
- c = echo_client('\n')
+ c = echo_client('\n', s.port)
c.push("hello world\n\nI'm not dead yet!\n")
c.push(SERVER_QUIT)
asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
def test_close_when_done(self):
s, event = start_echo_server()
- c = echo_client('\n')
+ c = echo_client('\n', s.port)
c.push("hello world\nI'm not dead yet!\n")
c.push(SERVER_QUIT)
c.close_when_done()
-import asyncore
-import unittest
-import select
-import os
-import socket
-import threading
-import sys
-import time
-
-from test import test_support
-from test.test_support import TESTFN, run_unittest, unlink
-from StringIO import StringIO
-
-HOST = "127.0.0.1"
-PORT = None
-
-class dummysocket:
- def __init__(self):
- self.closed = False
-
- def close(self):
- self.closed = True
-
- def fileno(self):
- return 42
-
-class dummychannel:
- def __init__(self):
- self.socket = dummysocket()
-
-class exitingdummy:
- def __init__(self):
- pass
-
- def handle_read_event(self):
- raise asyncore.ExitNow()
-
- handle_write_event = handle_read_event
- handle_expt_event = handle_read_event
-
-class crashingdummy:
- def __init__(self):
- self.error_handled = False
-
- def handle_read_event(self):
- raise Exception()
-
- handle_write_event = handle_read_event
- handle_expt_event = handle_read_event
-
- def handle_error(self):
- self.error_handled = True
-
-# used when testing senders; just collects what it gets until newline is sent
-def capture_server(evt, buf):
- try:
- serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- serv.settimeout(3)
- serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- serv.bind(("", 0))
- global PORT
- PORT = serv.getsockname()[1]
- serv.listen(5)
- conn, addr = serv.accept()
- except socket.timeout:
- pass
- else:
- n = 200
- while n > 0:
- r, w, e = select.select([conn], [], [])
- if r:
- data = conn.recv(10)
- # keep everything except for the newline terminator
- buf.write(data.replace('\n', ''))
- if '\n' in data:
- break
- n -= 1
- time.sleep(0.01)
-
- conn.close()
- finally:
- serv.close()
- PORT = None
- evt.set()
-
-
-class HelperFunctionTests(unittest.TestCase):
- def test_readwriteexc(self):
- # Check exception handling behavior of read, write and _exception
-
- # check that ExitNow exceptions in the object handler method
- # bubbles all the way up through asyncore read/write/_exception calls
- tr1 = exitingdummy()
- self.assertRaises(asyncore.ExitNow, asyncore.read, tr1)
- self.assertRaises(asyncore.ExitNow, asyncore.write, tr1)
- self.assertRaises(asyncore.ExitNow, asyncore._exception, tr1)
-
- # check that an exception other than ExitNow in the object handler
- # method causes the handle_error method to get called
- tr2 = crashingdummy()
- asyncore.read(tr2)
- self.assertEqual(tr2.error_handled, True)
-
- tr2 = crashingdummy()
- asyncore.write(tr2)
- self.assertEqual(tr2.error_handled, True)
-
- tr2 = crashingdummy()
- asyncore._exception(tr2)
- self.assertEqual(tr2.error_handled, True)
-
- # asyncore.readwrite uses constants in the select module that
- # are not present in Windows systems (see this thread:
- # http://mail.python.org/pipermail/python-list/2001-October/109973.html)
- # These constants should be present as long as poll is available
-
- if hasattr(select, 'poll'):
- def test_readwrite(self):
- # Check that correct methods are called by readwrite()
-
- class testobj:
- def __init__(self):
- self.read = False
- self.write = False
- self.expt = False
-
- def handle_read_event(self):
- self.read = True
-
- def handle_write_event(self):
- self.write = True
-
- def handle_expt_event(self):
- self.expt = True
-
- def handle_error(self):
- self.error_handled = True
-
- for flag in (select.POLLIN, select.POLLPRI):
- tobj = testobj()
- self.assertEqual(tobj.read, False)
- asyncore.readwrite(tobj, flag)
- self.assertEqual(tobj.read, True)
-
- # check that ExitNow exceptions in the object handler method
- # bubbles all the way up through asyncore readwrite call
- tr1 = exitingdummy()
- self.assertRaises(asyncore.ExitNow, asyncore.readwrite, tr1, flag)
-
- # check that an exception other than ExitNow in the object handler
- # method causes the handle_error method to get called
- tr2 = crashingdummy()
- asyncore.readwrite(tr2, flag)
- self.assertEqual(tr2.error_handled, True)
-
- tobj = testobj()
- self.assertEqual(tobj.write, False)
- asyncore.readwrite(tobj, select.POLLOUT)
- self.assertEqual(tobj.write, True)
-
- # check that ExitNow exceptions in the object handler method
- # bubbles all the way up through asyncore readwrite call
- tr1 = exitingdummy()
- self.assertRaises(asyncore.ExitNow, asyncore.readwrite, tr1,
- select.POLLOUT)
-
- # check that an exception other than ExitNow in the object handler
- # method causes the handle_error method to get called
- tr2 = crashingdummy()
- asyncore.readwrite(tr2, select.POLLOUT)
- self.assertEqual(tr2.error_handled, True)
-
- for flag in (select.POLLERR, select.POLLHUP, select.POLLNVAL):
- tobj = testobj()
- self.assertEqual(tobj.expt, False)
- asyncore.readwrite(tobj, flag)
- self.assertEqual(tobj.expt, True)
-
- # check that ExitNow exceptions in the object handler method
- # bubbles all the way up through asyncore readwrite calls
- tr1 = exitingdummy()
- self.assertRaises(asyncore.ExitNow, asyncore.readwrite, tr1, flag)
-
- # check that an exception other than ExitNow in the object handler
- # method causes the handle_error method to get called
- tr2 = crashingdummy()
- asyncore.readwrite(tr2, flag)
- self.assertEqual(tr2.error_handled, True)
-
- def test_closeall(self):
- self.closeall_check(False)
-
- def test_closeall_default(self):
- self.closeall_check(True)
-
- def closeall_check(self, usedefault):
- # Check that close_all() closes everything in a given map
-
- l = []
- testmap = {}
- for i in range(10):
- c = dummychannel()
- l.append(c)
- self.assertEqual(c.socket.closed, False)
- testmap[i] = c
-
- if usedefault:
- socketmap = asyncore.socket_map
- try:
- asyncore.socket_map = testmap
- asyncore.close_all()
- finally:
- testmap, asyncore.socket_map = asyncore.socket_map, socketmap
- else:
- asyncore.close_all(testmap)
-
- self.assertEqual(len(testmap), 0)
-
- for c in l:
- self.assertEqual(c.socket.closed, True)
-
- def test_compact_traceback(self):
- try:
- raise Exception("I don't like spam!")
- except:
- real_t, real_v, real_tb = sys.exc_info()
- r = asyncore.compact_traceback()
- else:
- self.fail("Expected exception")
-
- (f, function, line), t, v, info = r
- self.assertEqual(os.path.split(f)[-1], 'test_asyncore.py')
- self.assertEqual(function, 'test_compact_traceback')
- self.assertEqual(t, real_t)
- self.assertEqual(v, real_v)
- self.assertEqual(info, '[%s|%s|%s]' % (f, function, line))
-
-
-class DispatcherTests(unittest.TestCase):
- def setUp(self):
- pass
-
- def tearDown(self):
- asyncore.close_all()
-
- def test_basic(self):
- d = asyncore.dispatcher()
- self.assertEqual(d.readable(), True)
- self.assertEqual(d.writable(), True)
-
- def test_repr(self):
- d = asyncore.dispatcher()
- self.assertEqual(repr(d), '<asyncore.dispatcher at %#x>' % id(d))
-
- def test_log(self):
- d = asyncore.dispatcher()
-
- # capture output of dispatcher.log() (to stderr)
- fp = StringIO()
- stderr = sys.stderr
- l1 = "Lovely spam! Wonderful spam!"
- l2 = "I don't like spam!"
- try:
- sys.stderr = fp
- d.log(l1)
- d.log(l2)
- finally:
- sys.stderr = stderr
-
- lines = fp.getvalue().splitlines()
- self.assertEquals(lines, ['log: %s' % l1, 'log: %s' % l2])
-
- def test_log_info(self):
- d = asyncore.dispatcher()
-
- # capture output of dispatcher.log_info() (to stdout via print)
- fp = StringIO()
- stdout = sys.stdout
- l1 = "Have you got anything without spam?"
- l2 = "Why can't she have egg bacon spam and sausage?"
- l3 = "THAT'S got spam in it!"
- try:
- sys.stdout = fp
- d.log_info(l1, 'EGGS')
- d.log_info(l2)
- d.log_info(l3, 'SPAM')
- finally:
- sys.stdout = stdout
-
- lines = fp.getvalue().splitlines()
- if __debug__:
- expected = ['EGGS: %s' % l1, 'info: %s' % l2, 'SPAM: %s' % l3]
- else:
- expected = ['EGGS: %s' % l1, 'SPAM: %s' % l3]
-
- self.assertEquals(lines, expected)
-
- def test_unhandled(self):
- d = asyncore.dispatcher()
-
- # capture output of dispatcher.log_info() (to stdout via print)
- fp = StringIO()
- stdout = sys.stdout
- try:
- sys.stdout = fp
- d.handle_expt()
- d.handle_read()
- d.handle_write()
- d.handle_connect()
- d.handle_accept()
- finally:
- sys.stdout = stdout
-
- lines = fp.getvalue().splitlines()
- expected = ['warning: unhandled exception',
- 'warning: unhandled read event',
- 'warning: unhandled write event',
- 'warning: unhandled connect event',
- 'warning: unhandled accept event']
- self.assertEquals(lines, expected)
-
-
-
-class dispatcherwithsend_noread(asyncore.dispatcher_with_send):
- def readable(self):
- return False
-
- def handle_connect(self):
- pass
-
-class DispatcherWithSendTests(unittest.TestCase):
- usepoll = False
-
- def setUp(self):
- pass
-
- def tearDown(self):
- asyncore.close_all()
-
- def test_send(self):
- self.evt = threading.Event()
- cap = StringIO()
- threading.Thread(target=capture_server, args=(self.evt,cap)).start()
-
- # wait until server thread has assigned a port number
- n = 1000
- while PORT is None and n > 0:
- time.sleep(0.01)
- n -= 1
-
- # wait a little longer for the server to initialize (it sometimes
- # refuses connections on slow machines without this wait)
- time.sleep(0.2)
-
- data = "Suppose there isn't a 16-ton weight?"
- d = dispatcherwithsend_noread()
- d.create_socket(socket.AF_INET, socket.SOCK_STREAM)
- d.connect((HOST, PORT))
-
- # give time for socket to connect
- time.sleep(0.1)
-
- d.send(data)
- d.send(data)
- d.send('\n')
-
- n = 1000
- while d.out_buffer and n > 0:
- asyncore.poll()
- n -= 1
-
- self.evt.wait()
-
- self.assertEqual(cap.getvalue(), data*2)
-
-
-class DispatcherWithSendTests_UsePoll(DispatcherWithSendTests):
- usepoll = True
-
-if hasattr(asyncore, 'file_wrapper'):
- class FileWrapperTest(unittest.TestCase):
- def setUp(self):
- self.d = "It's not dead, it's sleeping!"
- file(TESTFN, 'w').write(self.d)
-
- def tearDown(self):
- unlink(TESTFN)
-
- def test_recv(self):
- fd = os.open(TESTFN, os.O_RDONLY)
- w = asyncore.file_wrapper(fd)
-
- self.assertEqual(w.fd, fd)
- self.assertEqual(w.fileno(), fd)
- self.assertEqual(w.recv(13), "It's not dead")
- self.assertEqual(w.read(6), ", it's")
- w.close()
- self.assertRaises(OSError, w.read, 1)
-
- def test_send(self):
- d1 = "Come again?"
- d2 = "I want to buy some cheese."
- fd = os.open(TESTFN, os.O_WRONLY | os.O_APPEND)
- w = asyncore.file_wrapper(fd)
-
- w.write(d1)
- w.send(d2)
- w.close()
- self.assertEqual(file(TESTFN).read(), self.d + d1 + d2)
-
-
-def test_main():
- tests = [HelperFunctionTests, DispatcherTests, DispatcherWithSendTests,
- DispatcherWithSendTests_UsePoll]
- if hasattr(asyncore, 'file_wrapper'):
- tests.append(FileWrapperTest)
-
- run_unittest(*tests)
-
-if __name__ == "__main__":
- test_main()
+import asyncore\r
+import unittest\r
+import select\r
+import os\r
+import socket\r
+import threading\r
+import sys\r
+import time\r
+\r
+from test import test_support\r
+from test.test_support import TESTFN, run_unittest, unlink\r
+from StringIO import StringIO\r
+\r
+HOST = test_support.HOST\r
+\r
+class dummysocket:\r
+ def __init__(self):\r
+ self.closed = False\r
+\r
+ def close(self):\r
+ self.closed = True\r
+\r
+ def fileno(self):\r
+ return 42\r
+\r
+class dummychannel:\r
+ def __init__(self):\r
+ self.socket = dummysocket()\r
+\r
+class exitingdummy:\r
+ def __init__(self):\r
+ pass\r
+\r
+ def handle_read_event(self):\r
+ raise asyncore.ExitNow()\r
+\r
+ handle_write_event = handle_read_event\r
+ handle_expt_event = handle_read_event\r
+\r
+class crashingdummy:\r
+ def __init__(self):\r
+ self.error_handled = False\r
+\r
+ def handle_read_event(self):\r
+ raise Exception()\r
+\r
+ handle_write_event = handle_read_event\r
+ handle_expt_event = handle_read_event\r
+\r
+ def handle_error(self):\r
+ self.error_handled = True\r
+\r
+# used when testing senders; just collects what it gets until newline is sent\r
+def capture_server(evt, buf, serv):\r
+ try:\r
+ serv.listen(5)\r
+ conn, addr = serv.accept()\r
+ except socket.timeout:\r
+ pass\r
+ else:\r
+ n = 200\r
+ while n > 0:\r
+ r, w, e = select.select([conn], [], [])\r
+ if r:\r
+ data = conn.recv(10)\r
+ # keep everything except for the newline terminator\r
+ buf.write(data.replace('\n', ''))\r
+ if '\n' in data:\r
+ break\r
+ n -= 1\r
+ time.sleep(0.01)\r
+\r
+ conn.close()\r
+ finally:\r
+ serv.close()\r
+ evt.set()\r
+\r
+\r
+class HelperFunctionTests(unittest.TestCase):\r
+ def test_readwriteexc(self):\r
+ # Check exception handling behavior of read, write and _exception\r
+\r
+ # check that ExitNow exceptions in the object handler method\r
+ # bubbles all the way up through asyncore read/write/_exception calls\r
+ tr1 = exitingdummy()\r
+ self.assertRaises(asyncore.ExitNow, asyncore.read, tr1)\r
+ self.assertRaises(asyncore.ExitNow, asyncore.write, tr1)\r
+ self.assertRaises(asyncore.ExitNow, asyncore._exception, tr1)\r
+\r
+ # check that an exception other than ExitNow in the object handler\r
+ # method causes the handle_error method to get called\r
+ tr2 = crashingdummy()\r
+ asyncore.read(tr2)\r
+ self.assertEqual(tr2.error_handled, True)\r
+\r
+ tr2 = crashingdummy()\r
+ asyncore.write(tr2)\r
+ self.assertEqual(tr2.error_handled, True)\r
+\r
+ tr2 = crashingdummy()\r
+ asyncore._exception(tr2)\r
+ self.assertEqual(tr2.error_handled, True)\r
+\r
+ # asyncore.readwrite uses constants in the select module that\r
+ # are not present in Windows systems (see this thread:\r
+ # http://mail.python.org/pipermail/python-list/2001-October/109973.html)\r
+ # These constants should be present as long as poll is available\r
+\r
+ if hasattr(select, 'poll'):\r
+ def test_readwrite(self):\r
+ # Check that correct methods are called by readwrite()\r
+\r
+ class testobj:\r
+ def __init__(self):\r
+ self.read = False\r
+ self.write = False\r
+ self.expt = False\r
+\r
+ def handle_read_event(self):\r
+ self.read = True\r
+\r
+ def handle_write_event(self):\r
+ self.write = True\r
+\r
+ def handle_expt_event(self):\r
+ self.expt = True\r
+\r
+ def handle_error(self):\r
+ self.error_handled = True\r
+\r
+ for flag in (select.POLLIN, select.POLLPRI):\r
+ tobj = testobj()\r
+ self.assertEqual(tobj.read, False)\r
+ asyncore.readwrite(tobj, flag)\r
+ self.assertEqual(tobj.read, True)\r
+\r
+ # check that ExitNow exceptions in the object handler method\r
+ # bubbles all the way up through asyncore readwrite call\r
+ tr1 = exitingdummy()\r
+ self.assertRaises(asyncore.ExitNow, asyncore.readwrite, tr1, flag)\r
+\r
+ # check that an exception other than ExitNow in the object handler\r
+ # method causes the handle_error method to get called\r
+ tr2 = crashingdummy()\r
+ asyncore.readwrite(tr2, flag)\r
+ self.assertEqual(tr2.error_handled, True)\r
+\r
+ tobj = testobj()\r
+ self.assertEqual(tobj.write, False)\r
+ asyncore.readwrite(tobj, select.POLLOUT)\r
+ self.assertEqual(tobj.write, True)\r
+\r
+ # check that ExitNow exceptions in the object handler method\r
+ # bubbles all the way up through asyncore readwrite call\r
+ tr1 = exitingdummy()\r
+ self.assertRaises(asyncore.ExitNow, asyncore.readwrite, tr1,\r
+ select.POLLOUT)\r
+\r
+ # check that an exception other than ExitNow in the object handler\r
+ # method causes the handle_error method to get called\r
+ tr2 = crashingdummy()\r
+ asyncore.readwrite(tr2, select.POLLOUT)\r
+ self.assertEqual(tr2.error_handled, True)\r
+\r
+ for flag in (select.POLLERR, select.POLLHUP, select.POLLNVAL):\r
+ tobj = testobj()\r
+ self.assertEqual(tobj.expt, False)\r
+ asyncore.readwrite(tobj, flag)\r
+ self.assertEqual(tobj.expt, True)\r
+\r
+ # check that ExitNow exceptions in the object handler method\r
+ # bubbles all the way up through asyncore readwrite calls\r
+ tr1 = exitingdummy()\r
+ self.assertRaises(asyncore.ExitNow, asyncore.readwrite, tr1, flag)\r
+\r
+ # check that an exception other than ExitNow in the object handler\r
+ # method causes the handle_error method to get called\r
+ tr2 = crashingdummy()\r
+ asyncore.readwrite(tr2, flag)\r
+ self.assertEqual(tr2.error_handled, True)\r
+\r
+ def test_closeall(self):\r
+ self.closeall_check(False)\r
+\r
+ def test_closeall_default(self):\r
+ self.closeall_check(True)\r
+\r
+ def closeall_check(self, usedefault):\r
+ # Check that close_all() closes everything in a given map\r
+\r
+ l = []\r
+ testmap = {}\r
+ for i in range(10):\r
+ c = dummychannel()\r
+ l.append(c)\r
+ self.assertEqual(c.socket.closed, False)\r
+ testmap[i] = c\r
+\r
+ if usedefault:\r
+ socketmap = asyncore.socket_map\r
+ try:\r
+ asyncore.socket_map = testmap\r
+ asyncore.close_all()\r
+ finally:\r
+ testmap, asyncore.socket_map = asyncore.socket_map, socketmap\r
+ else:\r
+ asyncore.close_all(testmap)\r
+\r
+ self.assertEqual(len(testmap), 0)\r
+\r
+ for c in l:\r
+ self.assertEqual(c.socket.closed, True)\r
+\r
+ def test_compact_traceback(self):\r
+ try:\r
+ raise Exception("I don't like spam!")\r
+ except:\r
+ real_t, real_v, real_tb = sys.exc_info()\r
+ r = asyncore.compact_traceback()\r
+ else:\r
+ self.fail("Expected exception")\r
+\r
+ (f, function, line), t, v, info = r\r
+ self.assertEqual(os.path.split(f)[-1], 'test_asyncore.py')\r
+ self.assertEqual(function, 'test_compact_traceback')\r
+ self.assertEqual(t, real_t)\r
+ self.assertEqual(v, real_v)\r
+ self.assertEqual(info, '[%s|%s|%s]' % (f, function, line))\r
+\r
+\r
+class DispatcherTests(unittest.TestCase):\r
+ def setUp(self):\r
+ pass\r
+\r
+ def tearDown(self):\r
+ asyncore.close_all()\r
+\r
+ def test_basic(self):\r
+ d = asyncore.dispatcher()\r
+ self.assertEqual(d.readable(), True)\r
+ self.assertEqual(d.writable(), True)\r
+\r
+ def test_repr(self):\r
+ d = asyncore.dispatcher()\r
+ self.assertEqual(repr(d), '<asyncore.dispatcher at %#x>' % id(d))\r
+\r
+ def test_log(self):\r
+ d = asyncore.dispatcher()\r
+\r
+ # capture output of dispatcher.log() (to stderr)\r
+ fp = StringIO()\r
+ stderr = sys.stderr\r
+ l1 = "Lovely spam! Wonderful spam!"\r
+ l2 = "I don't like spam!"\r
+ try:\r
+ sys.stderr = fp\r
+ d.log(l1)\r
+ d.log(l2)\r
+ finally:\r
+ sys.stderr = stderr\r
+\r
+ lines = fp.getvalue().splitlines()\r
+ self.assertEquals(lines, ['log: %s' % l1, 'log: %s' % l2])\r
+\r
+ def test_log_info(self):\r
+ d = asyncore.dispatcher()\r
+\r
+ # capture output of dispatcher.log_info() (to stdout via print)\r
+ fp = StringIO()\r
+ stdout = sys.stdout\r
+ l1 = "Have you got anything without spam?"\r
+ l2 = "Why can't she have egg bacon spam and sausage?"\r
+ l3 = "THAT'S got spam in it!"\r
+ try:\r
+ sys.stdout = fp\r
+ d.log_info(l1, 'EGGS')\r
+ d.log_info(l2)\r
+ d.log_info(l3, 'SPAM')\r
+ finally:\r
+ sys.stdout = stdout\r
+\r
+ lines = fp.getvalue().splitlines()\r
+ if __debug__:\r
+ expected = ['EGGS: %s' % l1, 'info: %s' % l2, 'SPAM: %s' % l3]\r
+ else:\r
+ expected = ['EGGS: %s' % l1, 'SPAM: %s' % l3]\r
+\r
+ self.assertEquals(lines, expected)\r
+\r
+ def test_unhandled(self):\r
+ d = asyncore.dispatcher()\r
+\r
+ # capture output of dispatcher.log_info() (to stdout via print)\r
+ fp = StringIO()\r
+ stdout = sys.stdout\r
+ try:\r
+ sys.stdout = fp\r
+ d.handle_expt()\r
+ d.handle_read()\r
+ d.handle_write()\r
+ d.handle_connect()\r
+ d.handle_accept()\r
+ finally:\r
+ sys.stdout = stdout\r
+\r
+ lines = fp.getvalue().splitlines()\r
+ expected = ['warning: unhandled exception',\r
+ 'warning: unhandled read event',\r
+ 'warning: unhandled write event',\r
+ 'warning: unhandled connect event',\r
+ 'warning: unhandled accept event']\r
+ self.assertEquals(lines, expected)\r
+\r
+\r
+\r
+class dispatcherwithsend_noread(asyncore.dispatcher_with_send):\r
+ def readable(self):\r
+ return False\r
+\r
+ def handle_connect(self):\r
+ pass\r
+\r
+class DispatcherWithSendTests(unittest.TestCase):\r
+ usepoll = False\r
+\r
+ def setUp(self):\r
+ pass\r
+\r
+ def tearDown(self):\r
+ asyncore.close_all()\r
+\r
+ def test_send(self):\r
+ self.evt = threading.Event()\r
+ self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)\r
+ self.sock.settimeout(3)\r
+ self.port = test_support.bind_port(self.sock)\r
+\r
+ cap = StringIO()\r
+ args = (self.evt, cap, self.sock)\r
+ threading.Thread(target=capture_server, args=args).start()\r
+\r
+ # wait a little longer for the server to initialize (it sometimes\r
+ # refuses connections on slow machines without this wait)\r
+ time.sleep(0.2)\r
+\r
+ data = "Suppose there isn't a 16-ton weight?"\r
+ d = dispatcherwithsend_noread()\r
+ d.create_socket(socket.AF_INET, socket.SOCK_STREAM)\r
+ d.connect((HOST, self.port))\r
+\r
+ # give time for socket to connect\r
+ time.sleep(0.1)\r
+\r
+ d.send(data)\r
+ d.send(data)\r
+ d.send('\n')\r
+\r
+ n = 1000\r
+ while d.out_buffer and n > 0:\r
+ asyncore.poll()\r
+ n -= 1\r
+\r
+ self.evt.wait()\r
+\r
+ self.assertEqual(cap.getvalue(), data*2)\r
+\r
+\r
+class DispatcherWithSendTests_UsePoll(DispatcherWithSendTests):\r
+ usepoll = True\r
+\r
+if hasattr(asyncore, 'file_wrapper'):\r
+ class FileWrapperTest(unittest.TestCase):\r
+ def setUp(self):\r
+ self.d = "It's not dead, it's sleeping!"\r
+ file(TESTFN, 'w').write(self.d)\r
+\r
+ def tearDown(self):\r
+ unlink(TESTFN)\r
+\r
+ def test_recv(self):\r
+ fd = os.open(TESTFN, os.O_RDONLY)\r
+ w = asyncore.file_wrapper(fd)\r
+\r
+ self.assertEqual(w.fd, fd)\r
+ self.assertEqual(w.fileno(), fd)\r
+ self.assertEqual(w.recv(13), "It's not dead")\r
+ self.assertEqual(w.read(6), ", it's")\r
+ w.close()\r
+ self.assertRaises(OSError, w.read, 1)\r
+\r
+ def test_send(self):\r
+ d1 = "Come again?"\r
+ d2 = "I want to buy some cheese."\r
+ fd = os.open(TESTFN, os.O_WRONLY | os.O_APPEND)\r
+ w = asyncore.file_wrapper(fd)\r
+\r
+ w.write(d1)\r
+ w.send(d2)\r
+ w.close()\r
+ self.assertEqual(file(TESTFN).read(), self.d + d1 + d2)\r
+\r
+\r
+def test_main():\r
+ tests = [HelperFunctionTests, DispatcherTests, DispatcherWithSendTests,\r
+ DispatcherWithSendTests_UsePoll]\r
+ if hasattr(asyncore, 'file_wrapper'):\r
+ tests.append(FileWrapperTest)\r
+\r
+ run_unittest(*tests)\r
+\r
+if __name__ == "__main__":\r
+ test_main()\r
from unittest import TestCase
from test import test_support
-server_port = None
+HOST = test_support.HOST
# This function sets the evt 3 times:
# 1) when the connection is ready to be accepted.
# 2) when it is safe for the caller to close the connection
# 3) when we have closed the socket
-def server(evt):
- global server_port
- serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- serv.settimeout(3)
- serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- server_port = test_support.bind_port(serv, "", 9091)
+def server(evt, serv):
serv.listen(5)
# (1) Signal the caller that we are ready to accept the connection.
evt.set()
def setUp(self):
self.evt = threading.Event()
- threading.Thread(target=server, args=(self.evt,)).start()
+ self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.sock.settimeout(3)
+ self.port = test_support.bind_port(self.sock)
+ threading.Thread(target=server, args=(self.evt,self.sock)).start()
# Wait for the server to be ready.
self.evt.wait()
self.evt.clear()
- ftplib.FTP.port = server_port
+ ftplib.FTP.port = self.port
def tearDown(self):
- # Wait on the closing of the socket (this shouldn't be necessary).
self.evt.wait()
def testBasic(self):
ftplib.FTP()
# connects
- ftp = ftplib.FTP("localhost")
+ ftp = ftplib.FTP(HOST)
self.evt.wait()
ftp.sock.close()
def testTimeoutDefault(self):
# default
- ftp = ftplib.FTP("localhost")
+ ftp = ftplib.FTP(HOST)
self.assertTrue(ftp.sock.gettimeout() is None)
self.evt.wait()
ftp.sock.close()
def testTimeoutValue(self):
# a value
- ftp = ftplib.FTP("localhost", timeout=30)
+ ftp = ftplib.FTP(HOST, timeout=30)
self.assertEqual(ftp.sock.gettimeout(), 30)
self.evt.wait()
ftp.sock.close()
def testTimeoutConnect(self):
ftp = ftplib.FTP()
- ftp.connect("localhost", timeout=30)
+ ftp.connect(HOST, timeout=30)
self.assertEqual(ftp.sock.gettimeout(), 30)
self.evt.wait()
ftp.sock.close()
def testTimeoutDifferentOrder(self):
ftp = ftplib.FTP(timeout=30)
- ftp.connect("localhost")
+ ftp.connect(HOST)
self.assertEqual(ftp.sock.gettimeout(), 30)
self.evt.wait()
ftp.sock.close()
def testTimeoutDirectAccess(self):
ftp = ftplib.FTP()
ftp.timeout = 30
- ftp.connect("localhost")
+ ftp.connect(HOST)
self.assertEqual(ftp.sock.gettimeout(), 30)
self.evt.wait()
ftp.sock.close()
previous = socket.getdefaulttimeout()
socket.setdefaulttimeout(30)
try:
- ftp = ftplib.FTP("localhost", timeout=None)
+ ftp = ftplib.FTP(HOST, timeout=None)
finally:
socket.setdefaulttimeout(previous)
self.assertEqual(ftp.sock.gettimeout(), 30)
from test import test_support
+HOST = test_support.HOST
+
class FakeSocket:
def __init__(self, text, fileclass=StringIO.StringIO):
self.text = text
def test_responses(self):
self.assertEquals(httplib.responses[httplib.NOT_FOUND], "Not Found")
-PORT = 50003
-HOST = "localhost"
-
class TimeoutTest(TestCase):
+ PORT = None
def setUp(self):
self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- global PORT
- PORT = test_support.bind_port(self.serv, HOST, PORT)
+ self.PORT = test_support.bind_port(self.serv)
self.serv.listen(5)
def tearDown(self):
HTTPConnection and into the socket.
'''
# default
- httpConn = httplib.HTTPConnection(HOST, PORT)
+ httpConn = httplib.HTTPConnection(HOST, self.PORT)
httpConn.connect()
self.assertTrue(httpConn.sock.gettimeout() is None)
httpConn.close()
# a value
- httpConn = httplib.HTTPConnection(HOST, PORT, timeout=30)
+ httpConn = httplib.HTTPConnection(HOST, self.PORT, timeout=30)
httpConn.connect()
self.assertEqual(httpConn.sock.gettimeout(), 30)
httpConn.close()
previous = socket.getdefaulttimeout()
socket.setdefaulttimeout(30)
try:
- httpConn = httplib.HTTPConnection(HOST, PORT, timeout=None)
+ httpConn = httplib.HTTPConnection(HOST, self.PORT, timeout=None)
httpConn.connect()
finally:
socket.setdefaulttimeout(previous)
def test_attributes(self):
# simple test to check it's storing it
if hasattr(httplib, 'HTTPSConnection'):
- h = httplib.HTTPSConnection(HOST, PORT, timeout=30)
+ h = httplib.HTTPSConnection(HOST, TimeoutTest.PORT, timeout=30)
self.assertEqual(h.timeout, 30)
def test_main(verbose=None):
- test_support.run_unittest(HeaderTests, OfflineTest, BasicTest, TimeoutTest, HTTPSTimeoutTest)
+ test_support.run_unittest(HeaderTests, OfflineTest, BasicTest, TimeoutTest,
+ HTTPSTimeoutTest)
if __name__ == '__main__':
test_main()
from unittest import TestCase
from test import test_support
+HOST = test_support.HOST
-def server(evt):
- serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- serv.settimeout(3)
- serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- serv.bind(("", 9091))
+def server(evt, serv):
serv.listen(5)
try:
conn, addr = serv.accept()
def setUp(self):
self.evt = threading.Event()
- threading.Thread(target=server, args=(self.evt,)).start()
+ self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.sock.settimeout(3)
+ self.port = test_support.bind_port(self.sock)
+ threading.Thread(target=server, args=(self.evt,self.sock)).start()
time.sleep(.1)
def tearDown(self):
def testBasic(self):
# connects
- pop = poplib.POP3("localhost", 9091)
+ pop = poplib.POP3(HOST, self.port)
pop.sock.close()
def testTimeoutDefault(self):
# default
- pop = poplib.POP3("localhost", 9091)
+ pop = poplib.POP3(HOST, self.port)
self.assertTrue(pop.sock.gettimeout() is None)
pop.sock.close()
def testTimeoutValue(self):
# a value
- pop = poplib.POP3("localhost", 9091, timeout=30)
+ pop = poplib.POP3(HOST, self.port, timeout=30)
self.assertEqual(pop.sock.gettimeout(), 30)
pop.sock.close()
previous = socket.getdefaulttimeout()
socket.setdefaulttimeout(30)
try:
- pop = poplib.POP3("localhost", 9091, timeout=None)
+ pop = poplib.POP3(HOST, self.port, timeout=None)
finally:
socket.setdefaulttimeout(previous)
self.assertEqual(pop.sock.gettimeout(), 30)
from unittest import TestCase
from test import test_support
-# PORT is used to communicate the port number assigned to the server
-# to the test client
-HOST = "localhost"
-PORT = None
-
-def server(evt, buf):
- serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- serv.settimeout(15)
- serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- serv.bind(("", 0))
- global PORT
- PORT = serv.getsockname()[1]
+HOST = test_support.HOST
+
+def server(evt, buf, serv):
serv.listen(5)
evt.set()
try:
conn.close()
finally:
serv.close()
- PORT = None
evt.set()
class GeneralTests(TestCase):
def setUp(self):
self.evt = threading.Event()
- servargs = (self.evt, "220 Hola mundo\n")
+ self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.sock.settimeout(15)
+ self.port = test_support.bind_port(self.sock)
+ servargs = (self.evt, "220 Hola mundo\n", self.sock)
threading.Thread(target=server, args=servargs).start()
self.evt.wait()
self.evt.clear()
def testBasic1(self):
# connects
- smtp = smtplib.SMTP(HOST, PORT)
+ smtp = smtplib.SMTP(HOST, self.port)
smtp.sock.close()
def testBasic2(self):
# connects, include port in host name
- smtp = smtplib.SMTP("%s:%s" % (HOST, PORT))
+ smtp = smtplib.SMTP("%s:%s" % (HOST, self.port))
smtp.sock.close()
def testLocalHostName(self):
# check that supplied local_hostname is used
- smtp = smtplib.SMTP(HOST, PORT, local_hostname="testhost")
+ smtp = smtplib.SMTP(HOST, self.port, local_hostname="testhost")
self.assertEqual(smtp.local_hostname, "testhost")
smtp.sock.close()
def testTimeoutDefault(self):
# default
- smtp = smtplib.SMTP(HOST, PORT)
+ smtp = smtplib.SMTP(HOST, self.port)
self.assertTrue(smtp.sock.gettimeout() is None)
smtp.sock.close()
def testTimeoutValue(self):
# a value
- smtp = smtplib.SMTP(HOST, PORT, timeout=30)
+ smtp = smtplib.SMTP(HOST, self.port, timeout=30)
self.assertEqual(smtp.sock.gettimeout(), 30)
smtp.sock.close()
previous = socket.getdefaulttimeout()
socket.setdefaulttimeout(30)
try:
- smtp = smtplib.SMTP(HOST, PORT, timeout=None)
+ smtp = smtplib.SMTP(HOST, self.port, timeout=None)
finally:
socket.setdefaulttimeout(previous)
self.assertEqual(smtp.sock.gettimeout(), 30)
# Test server thread using the specified SMTP server class
-def debugging_server(server_class, serv_evt, client_evt):
- serv = server_class(("", 0), ('nowhere', -1))
- global PORT
- PORT = serv.getsockname()[1]
+def debugging_server(serv, serv_evt, client_evt):
serv_evt.set()
try:
time.sleep(0.5)
serv.close()
asyncore.close_all()
- PORT = None
serv_evt.set()
MSG_BEGIN = '---------- MESSAGE FOLLOWS ----------\n'
self.serv_evt = threading.Event()
self.client_evt = threading.Event()
- serv_args = (smtpd.DebuggingServer, self.serv_evt, self.client_evt)
+ self.port = test_support.find_unused_port()
+ self.serv = smtpd.DebuggingServer((HOST, self.port), ('nowhere', -1))
+ serv_args = (self.serv, self.serv_evt, self.client_evt)
threading.Thread(target=debugging_server, args=serv_args).start()
# wait until server thread has assigned a port number
def testBasic(self):
# connect
- smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3)
+ smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
smtp.quit()
def testNOOP(self):
- smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3)
+ smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
expected = (250, 'Ok')
self.assertEqual(smtp.noop(), expected)
smtp.quit()
def testRSET(self):
- smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3)
+ smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
expected = (250, 'Ok')
self.assertEqual(smtp.rset(), expected)
smtp.quit()
def testNotImplemented(self):
# EHLO isn't implemented in DebuggingServer
- smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3)
+ smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
expected = (502, 'Error: command "EHLO" not implemented')
self.assertEqual(smtp.ehlo(), expected)
smtp.quit()
def testVRFY(self):
# VRFY isn't implemented in DebuggingServer
- smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3)
+ smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
expected = (502, 'Error: command "VRFY" not implemented')
self.assertEqual(smtp.vrfy('nobody@nowhere.com'), expected)
self.assertEqual(smtp.verify('nobody@nowhere.com'), expected)
def testSecondHELO(self):
# check that a second HELO returns a message that it's a duplicate
# (this behavior is specific to smtpd.SMTPChannel)
- smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3)
+ smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
smtp.helo()
expected = (503, 'Duplicate HELO/EHLO')
self.assertEqual(smtp.helo(), expected)
smtp.quit()
def testHELP(self):
- smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3)
+ smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
self.assertEqual(smtp.help(), 'Error: command "HELP" not implemented')
smtp.quit()
def testSend(self):
# connect and send mail
m = 'A test message'
- smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3)
+ smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
smtp.sendmail('John', 'Sally', m)
smtp.quit()
sys.stdout = self.output
self.evt = threading.Event()
- servargs = (self.evt, "199 no hello for you!\n")
+ self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.sock.settimeout(15)
+ self.port = test_support.bind_port(self.sock)
+ servargs = (self.evt, "199 no hello for you!\n", self.sock)
threading.Thread(target=server, args=servargs).start()
self.evt.wait()
self.evt.clear()
def testFailingHELO(self):
self.assertRaises(smtplib.SMTPConnectError, smtplib.SMTP,
- HOST, PORT, 'localhost', 3)
+ HOST, self.port, 'localhost', 3)
sim_users = {'Mr.A@somewhere.com':'John A',
def setUp(self):
self.serv_evt = threading.Event()
self.client_evt = threading.Event()
- serv_args = (SimSMTPServer, self.serv_evt, self.client_evt)
+ self.port = test_support.find_unused_port()
+ self.serv = SimSMTPServer((HOST, self.port), ('nowhere', -1))
+ serv_args = (self.serv, self.serv_evt, self.client_evt)
threading.Thread(target=debugging_server, args=serv_args).start()
# wait until server thread has assigned a port number
def testBasic(self):
# smoke test
- smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=15)
+ smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
smtp.quit()
def testEHLO(self):
- smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=15)
+ smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
# no features should be present before the EHLO
self.assertEqual(smtp.esmtp_features, {})
smtp.quit()
def testVRFY(self):
- smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=15)
+ smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
for email, name in sim_users.items():
expected_known = (250, '%s %s' % (name, smtplib.quoteaddr(email)))
smtp.quit()
def testEXPN(self):
- smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=15)
+ smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
for listname, members in sim_lists.items():
users = []
import unittest
from test import test_support
+import errno
import socket
import select
import thread, threading
from weakref import proxy
import signal
-PORT = 50007
-HOST = 'localhost'
+HOST = test_support.HOST
MSG = 'Michael Gilfix was here\n'
class SocketTCPTest(unittest.TestCase):
def setUp(self):
self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- global PORT
- PORT = test_support.bind_port(self.serv, HOST, PORT)
+ self.port = test_support.bind_port(self.serv)
self.serv.listen(1)
def tearDown(self):
def setUp(self):
self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- self.serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- global PORT
- PORT = test_support.bind_port(self.serv, HOST, PORT)
+ self.port = test_support.bind_port(self.serv)
def tearDown(self):
self.serv.close()
def clientSetUp(self):
ThreadedTCPSocketTest.clientSetUp(self)
- self.cli.connect((HOST, PORT))
+ self.cli.connect((HOST, self.port))
self.serv_conn = self.cli
def clientTearDown(self):
# XXX The following don't test module-level functionality...
def testSockName(self):
- # Testing getsockname()
+ # Testing getsockname(). Use a temporary socket to elicit an unused
+ # ephemeral port that we can use later in the test.
+ tempsock = socket.socket()
+ tempsock.bind(("0.0.0.0", 0))
+ (host, port) = tempsock.getsockname()
+ tempsock.close()
+ del tempsock
+
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.bind(("0.0.0.0", PORT+1))
+ sock.bind(("0.0.0.0", port))
name = sock.getsockname()
# XXX(nnorwitz): http://tinyurl.com/os5jz seems to indicate
# it reasonable to get the host's addr in addition to 0.0.0.0.
# At least for eCos. This is required for the S/390 to pass.
my_ip_addr = socket.gethostbyname(socket.gethostname())
self.assert_(name[0] in ("0.0.0.0", my_ip_addr), '%s invalid' % name[0])
- self.assertEqual(name[1], PORT+1)
+ self.assertEqual(name[1], port)
def testGetSockOpt(self):
# Testing getsockopt()
self.assertEqual(msg, MSG)
def _testSendtoAndRecv(self):
- self.cli.sendto(MSG, 0, (HOST, PORT))
+ self.cli.sendto(MSG, 0, (HOST, self.port))
def testRecvFrom(self):
# Testing recvfrom() over UDP
self.assertEqual(msg, MSG)
def _testRecvFrom(self):
- self.cli.sendto(MSG, 0, (HOST, PORT))
+ self.cli.sendto(MSG, 0, (HOST, self.port))
def testRecvFromNegative(self):
# Negative lengths passed to recvfrom should give ValueError.
self.assertRaises(ValueError, self.serv.recvfrom, -1)
def _testRecvFromNegative(self):
- self.cli.sendto(MSG, 0, (HOST, PORT))
+ self.cli.sendto(MSG, 0, (HOST, self.port))
class TCPCloserTest(ThreadedTCPSocketTest):
self.assertEqual(sd.recv(1), '')
def _testClose(self):
- self.cli.connect((HOST, PORT))
+ self.cli.connect((HOST, self.port))
time.sleep(1.0)
class BasicSocketPairTest(SocketPairTest):
def _testAccept(self):
time.sleep(0.1)
- self.cli.connect((HOST, PORT))
+ self.cli.connect((HOST, self.port))
def testConnect(self):
# Testing non-blocking connect
def _testConnect(self):
self.cli.settimeout(10)
- self.cli.connect((HOST, PORT))
+ self.cli.connect((HOST, self.port))
def testRecv(self):
# Testing non-blocking recv
self.fail("Error during select call to non-blocking socket.")
def _testRecv(self):
- self.cli.connect((HOST, PORT))
+ self.cli.connect((HOST, self.port))
time.sleep(0.1)
self.cli.send(MSG)
class NetworkConnectionTest(object):
"""Prove network connection."""
def clientSetUp(self):
- self.cli = socket.create_connection((HOST, PORT))
+ # We're inherited below by BasicTCPTest2, which also inherits
+ # BasicTCPTest, which defines self.port referenced below.
+ self.cli = socket.create_connection((HOST, self.port))
self.serv_conn = self.cli
class BasicTCPTest2(NetworkConnectionTest, BasicTCPTest):
class NetworkConnectionNoServer(unittest.TestCase):
def testWithoutServer(self):
- self.failUnlessRaises(socket.error, lambda: socket.create_connection((HOST, PORT)))
+ port = test_support.find_unused_port()
+ self.failUnlessRaises(
+ socket.error,
+ lambda: socket.create_connection((HOST, port))
+ )
class NetworkConnectionAttributesTest(SocketTCPTest, ThreadableTest):
testFamily = _justAccept
def _testFamily(self):
- self.cli = socket.create_connection((HOST, PORT), timeout=30)
+ self.cli = socket.create_connection((HOST, self.port), timeout=30)
self.assertEqual(self.cli.family, 2)
testTimeoutDefault = _justAccept
def _testTimeoutDefault(self):
- self.cli = socket.create_connection((HOST, PORT))
+ self.cli = socket.create_connection((HOST, self.port))
self.assertTrue(self.cli.gettimeout() is None)
testTimeoutValueNamed = _justAccept
def _testTimeoutValueNamed(self):
- self.cli = socket.create_connection((HOST, PORT), timeout=30)
+ self.cli = socket.create_connection((HOST, self.port), timeout=30)
self.assertEqual(self.cli.gettimeout(), 30)
testTimeoutValueNonamed = _justAccept
def _testTimeoutValueNonamed(self):
- self.cli = socket.create_connection((HOST, PORT), 30)
+ self.cli = socket.create_connection((HOST, self.port), 30)
self.assertEqual(self.cli.gettimeout(), 30)
testTimeoutNone = _justAccept
previous = socket.getdefaulttimeout()
socket.setdefaulttimeout(30)
try:
- self.cli = socket.create_connection((HOST, PORT), timeout=None)
+ self.cli = socket.create_connection((HOST, self.port), timeout=None)
finally:
socket.setdefaulttimeout(previous)
self.assertEqual(self.cli.gettimeout(), 30)
testOutsideTimeout = testInsideTimeout
def _testInsideTimeout(self):
- self.cli = sock = socket.create_connection((HOST, PORT))
+ self.cli = sock = socket.create_connection((HOST, self.port))
data = sock.recv(5)
self.assertEqual(data, "done!")
def _testOutsideTimeout(self):
- self.cli = sock = socket.create_connection((HOST, PORT), timeout=1)
+ self.cli = sock = socket.create_connection((HOST, self.port), timeout=1)
self.failUnlessRaises(socket.timeout, lambda: sock.recv(5))
# Optionally test SSL support, if we have it in the tested platform
skip_expected = not hasattr(socket, "ssl")
+HOST = test_support.HOST
class ConnectedTests(unittest.TestCase):
class BasicTests(unittest.TestCase):
def testRudeShutdown(self):
- # Some random port to connect to.
- PORT = [9934]
-
listener_ready = threading.Event()
listener_gone = threading.Event()
-
- # `listener` runs in a thread. It opens a socket listening on
- # PORT, and sits in an accept() until the main thread connects.
- # Then it rudely closes the socket, and sets Event `listener_gone`
- # to let the main thread know the socket is gone.
- def listener():
- s = socket.socket()
- PORT[0] = test_support.bind_port(s, '', PORT[0])
+ sock = socket.socket()
+ port = test_support.bind_port(sock)
+
+ # `listener` runs in a thread. It opens a socket and sits in accept()
+ # until the main thread connects. Then it rudely closes the socket,
+ # and sets Event `listener_gone` to let the main thread know the socket
+ # is gone.
+ def listener(s):
s.listen(5)
listener_ready.set()
s.accept()
def connector():
listener_ready.wait()
s = socket.socket()
- s.connect(('localhost', PORT[0]))
+ s.connect((HOST, port))
listener_gone.wait()
try:
ssl_sock = socket.ssl(s)
raise test_support.TestFailed(
'connecting to closed SSL socket should have failed')
- t = threading.Thread(target=listener)
+ t = threading.Thread(target=listener, args=(sock,))
t.start()
connector()
t.join()
def testBasic(self):
s = socket.socket()
- s.connect(("localhost", 4433))
+ s.connect((HOST, OpenSSLServer.PORT))
ss = socket.ssl(s)
ss.write("Foo\n")
i = ss.read(4)
info = "/C=PT/ST=Queensland/L=Lisboa/O=Neuronio, Lda./OU=Desenvolvimento/CN=brutus.neuronio.pt/emailAddress=sampo@iki.fi"
s = socket.socket()
- s.connect(("localhost", 4433))
+ s.connect((HOST, OpenSSLServer.PORT))
ss = socket.ssl(s)
cert = ss.server()
self.assertEqual(cert, info)
class OpenSSLServer(threading.Thread):
+ PORT = None
def __init__(self):
self.s = None
self.keepServing = True
raise ValueError("No key file found! (tried %r)" % key_file)
try:
- cmd = "openssl s_server -cert %s -key %s -quiet" % (cert_file, key_file)
+ # XXX TODO: on Windows, this should make more effort to use the
+ # openssl.exe that would have been built by the pcbuild.sln.
+ self.PORT = test_support.find_unused_port()
+ args = (self.PORT, cert_file, key_file)
+ cmd = "openssl s_server -accept %d -cert %s -key %s -quiet" % args
self.s = subprocess.Popen(cmd.split(), stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
# let's try if it is actually up
try:
s = socket.socket()
- s.connect(("localhost", 4433))
+ s.connect((HOST, self.PORT))
s.close()
if self.s.stdout.readline() != "ERROR\n":
raise ValueError
test.test_support.requires("network")
TEST_STR = "hello world\n"
-HOST = "localhost"
+HOST = test.test_support.HOST
HAVE_UNIX_SOCKETS = hasattr(socket, "AF_UNIX")
HAVE_FORKING = hasattr(os, "fork") and os.name != "os2"
except ImportError:
skip_expected = True
+HOST = test_support.HOST
CERTFILE = None
SVN_PYTHON_ORG_ROOT_CERT = None
-TESTPORT = 10025
-
def handle_error(prefix):
exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
if test_support.verbose:
except:
handle_error('')
- def __init__(self, port, certificate, ssl_version=None,
+ def __init__(self, certificate, ssl_version=None,
certreqs=None, cacerts=None, expect_bad_connects=False,
chatty=True, connectionchatty=False, starttls_server=False):
if ssl_version is None:
self.connectionchatty = connectionchatty
self.starttls_server = starttls_server
self.sock = socket.socket()
+ self.port = test_support.bind_port(self.sock)
self.flag = None
- if hasattr(socket, 'SO_REUSEADDR'):
- self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- if hasattr(socket, 'SO_REUSEPORT'):
- self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
- self.sock.bind(('127.0.0.1', port))
self.active = False
threading.Thread.__init__(self)
self.setDaemon(False)
format%args))
- def __init__(self, port, certfile):
+ def __init__(self, certfile):
self.flag = None
self.active = False
self.RootedHTTPRequestHandler.root = os.path.split(CERTFILE)[0]
+ self.port = test_support.find_unused_port()
self.server = self.HTTPSServer(
- ('', port), self.RootedHTTPRequestHandler, certfile)
+ (HOST, self.port), self.RootedHTTPRequestHandler, certfile)
threading.Thread.__init__(self)
self.setDaemon(True)
def badCertTest (certfile):
- server = ThreadedEchoServer(TESTPORT, CERTFILE,
+ server = ThreadedEchoServer(CERTFILE,
certreqs=ssl.CERT_REQUIRED,
cacerts=CERTFILE, chatty=False)
flag = threading.Event()
s = ssl.wrap_socket(socket.socket(),
certfile=certfile,
ssl_version=ssl.PROTOCOL_TLSv1)
- s.connect(('127.0.0.1', TESTPORT))
+ s.connect((HOST, server.port))
except ssl.SSLError, x:
if test_support.verbose:
sys.stdout.write("\nSSLError is %s\n" % x[1])
client_certfile, client_protocol=None, indata="FOO\n",
chatty=True, connectionchatty=False):
- server = ThreadedEchoServer(TESTPORT, certfile,
+ server = ThreadedEchoServer(certfile,
certreqs=certreqs,
ssl_version=protocol,
cacerts=cacertsfile,
ca_certs=cacertsfile,
cert_reqs=certreqs,
ssl_version=client_protocol)
- s.connect(('127.0.0.1', TESTPORT))
+ s.connect((HOST, server.port))
except ssl.SSLError, x:
raise test_support.TestFailed("Unexpected SSL error: " + str(x))
except Exception, x:
listener_ready = threading.Event()
listener_gone = threading.Event()
+ port = test_support.find_unused_port()
# `listener` runs in a thread. It opens a socket listening on
# PORT, and sits in an accept() until the main thread connects.
# to let the main thread know the socket is gone.
def listener():
s = socket.socket()
- if hasattr(socket, 'SO_REUSEADDR'):
- s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- if hasattr(socket, 'SO_REUSEPORT'):
- s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
- s.bind(('127.0.0.1', TESTPORT))
+ s.bind((HOST, port))
s.listen(5)
listener_ready.set()
s.accept()
def connector():
listener_ready.wait()
s = socket.socket()
- s.connect(('127.0.0.1', TESTPORT))
+ s.connect((HOST, port))
listener_gone.wait()
try:
ssl_sock = ssl.wrap_socket(s)
if test_support.verbose:
sys.stdout.write("\n")
s2 = socket.socket()
- server = ThreadedEchoServer(TESTPORT, CERTFILE,
+ server = ThreadedEchoServer(CERTFILE,
certreqs=ssl.CERT_NONE,
ssl_version=ssl.PROTOCOL_SSLv23,
cacerts=CERTFILE,
ca_certs=CERTFILE,
cert_reqs=ssl.CERT_REQUIRED,
ssl_version=ssl.PROTOCOL_SSLv23)
- s.connect(('127.0.0.1', TESTPORT))
+ s.connect((HOST, server.port))
except ssl.SSLError, x:
raise test_support.TestFailed(
"Unexpected SSL error: " + str(x))
msgs = ("msg 1", "MSG 2", "STARTTLS", "MSG 3", "msg 4")
- server = ThreadedEchoServer(TESTPORT, CERTFILE,
+ server = ThreadedEchoServer(CERTFILE,
ssl_version=ssl.PROTOCOL_TLSv1,
starttls_server=True,
chatty=True,
try:
s = socket.socket()
s.setblocking(1)
- s.connect(('127.0.0.1', TESTPORT))
+ s.connect((HOST, server.port))
except Exception, x:
raise test_support.TestFailed("Unexpected exception: " + str(x))
else:
def testAsyncore(self):
- server = AsyncoreHTTPSServer(TESTPORT, CERTFILE)
+ server = AsyncoreHTTPSServer(CERTFILE)
flag = threading.Event()
server.start(flag)
# wait for it to start
d1 = open(CERTFILE, 'rb').read()
d2 = ''
# now fetch the same data from the HTTPS server
- url = 'https://127.0.0.1:%d/%s' % (
- TESTPORT, os.path.split(CERTFILE)[1])
+ url = 'https://%s:%d/%s' % (
+ HOST, server.port, os.path.split(CERTFILE)[1])
f = urllib.urlopen(url)
dlen = f.info().getheader("content-length")
if dlen and (int(dlen) > 0):
server.join()
-def findtestsocket(start, end):
- def testbind(i):
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- try:
- s.bind(("127.0.0.1", i))
- except:
- return 0
- else:
- return 1
- finally:
- s.close()
-
- for i in range(start, end):
- if testbind(i) and testbind(i+1):
- return i
- return 0
-
-
def test_main(verbose=False):
if skip_expected:
raise test_support.TestSkipped("No SSL support")
- global CERTFILE, TESTPORT, SVN_PYTHON_ORG_ROOT_CERT
+ global CERTFILE, SVN_PYTHON_ORG_ROOT_CERT
CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir,
"keycert.pem")
SVN_PYTHON_ORG_ROOT_CERT = os.path.join(
if (not os.path.exists(CERTFILE) or
not os.path.exists(SVN_PYTHON_ORG_ROOT_CERT)):
raise test_support.TestFailed("Can't read certificate files!")
- TESTPORT = findtestsocket(10025, 12000)
- if not TESTPORT:
- raise test_support.TestFailed("Can't find open port to test servers on!")
tests = [BasicTests]
msg = "Use of the `%s' resource not enabled" % resource
raise ResourceDenied(msg)
-def bind_port(sock, host='', preferred_port=54321):
- """Try to bind the sock to a port. If we are running multiple
- tests and we don't try multiple ports, the test can fail. This
- makes the test more robust."""
-
- # Find some random ports that hopefully no one is listening on.
- # Ideally each test would clean up after itself and not continue listening
- # on any ports. However, this isn't the case. The last port (0) is
- # a stop-gap that asks the O/S to assign a port. Whenever the warning
- # message below is printed, the test that is listening on the port should
- # be fixed to close the socket at the end of the test.
- # Another reason why we can't use a port is another process (possibly
- # another instance of the test suite) is using the same port.
- for port in [preferred_port, 9907, 10243, 32999, 0]:
- try:
- sock.bind((host, port))
- if port == 0:
- port = sock.getsockname()[1]
- return port
- except socket.error, (err, msg):
- if err != errno.EADDRINUSE:
- raise
- print >>sys.__stderr__, \
- ' WARNING: failed to listen on port %d, trying another' % port
- raise TestFailed('unable to find port to listen on')
+HOST = 'localhost'
+
+def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM):
+ """Returns an unused port that should be suitable for binding. This is
+ achieved by creating a temporary socket with the same family and type as
+ the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to
+ the specified host address (defaults to 0.0.0.0) with the port set to 0,
+ eliciting an unused ephemeral port from the OS. The temporary socket is
+ then closed and deleted, and the ephemeral port is returned.
+
+ Either this method or bind_port() should be used for any tests where a
+ server socket needs to be bound to a particular port for the duration of
+ the test. Which one to use depends on whether the calling code is creating
+ a python socket, or if an unused port needs to be provided in a constructor
+ or passed to an external program (i.e. the -accept argument to openssl's
+ s_server mode). Always prefer bind_port() over find_unused_port() where
+ possible. Hard coded ports should *NEVER* be used. As soon as a server
+ socket is bound to a hard coded port, the ability to run multiple instances
+ of the test simultaneously on the same host is compromised, which makes the
+ test a ticking time bomb in a buildbot environment. On Unix buildbots, this
+ may simply manifest as a failed test, which can be recovered from without
+ intervention in most cases, but on Windows, the entire python process can
+ completely and utterly wedge, requiring someone to log in to the buildbot
+ and manually kill the affected process.
+
+ (This is easy to reproduce on Windows, unfortunately, and can be traced to
+ the SO_REUSEADDR socket option having different semantics on Windows versus
+ Unix/Linux. On Unix, you can't have two AF_INET SOCK_STREAM sockets bind,
+ listen and then accept connections on identical host/ports. An EADDRINUSE
+ socket.error will be raised at some point (depending on the platform and
+ the order bind and listen were called on each socket).
+
+ However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE
+ will ever be raised when attempting to bind two identical host/ports. When
+ accept() is called on each socket, the second caller's process will steal
+ the port from the first caller, leaving them both in an awkwardly wedged
+ state where they'll no longer respond to any signals or graceful kills, and
+ must be forcibly killed via OpenProcess()/TerminateProcess().
+
+ The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option
+ instead of SO_REUSEADDR, which effectively affords the same semantics as
+ SO_REUSEADDR on Unix. Given the propensity of Unix developers in the Open
+ Source world compared to Windows ones, this is a common mistake. A quick
+ look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when
+ openssl.exe is called with the 's_server' option, for example. See
+ http://bugs.python.org/issue2550 for more info. The following site also
+ has a very thorough description about the implications of both REUSEADDR
+ and EXCLUSIVEADDRUSE on Windows:
+ http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx)
+
+ XXX: although this approach is a vast improvement on previous attempts to
+ elicit unused ports, it rests heavily on the assumption that the ephemeral
+ port returned to us by the OS won't immediately be dished back out to some
+ other process when we close and delete our temporary socket but before our
+ calling code has a chance to bind the returned port. We can deal with this
+ issue if/when we come across it."""
+ tempsock = socket.socket(family, socktype)
+ port = bind_port(tempsock)
+ tempsock.close()
+ del tempsock
+ return port
+
+def bind_port(sock, host=HOST):
+ """Bind the socket to a free port and return the port number. Relies on
+ ephemeral ports in order to ensure we are using an unbound port. This is
+ important as many tests may be running simultaneously, especially in a
+ buildbot environment. This method raises an exception if the sock.family
+ is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
+ or SO_REUSEPORT set on it. Tests should *never* set these socket options
+ for TCP/IP sockets. The only case for setting these options is testing
+ multicasting via multiple UDP sockets.
+
+ Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
+ on Windows), it will be set on the socket. This will prevent anyone else
+ from bind()'ing to our host/port for the duration of the test.
+ """
+ if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
+ if hasattr(socket, 'SO_REUSEADDR'):
+ if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
+ raise TestFailed("tests should never set the SO_REUSEADDR " \
+ "socket option on TCP/IP sockets!")
+ if hasattr(socket, 'SO_REUSEPORT'):
+ if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
+ raise TestFailed("tests should never set the SO_REUSEPORT " \
+ "socket option on TCP/IP sockets!")
+ if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
+
+ sock.bind((host, 0))
+ port = sock.getsockname()[1]
+ return port
FUZZ = 1e-6
from unittest import TestCase
from test import test_support
-PORT = 9091
+HOST = test_support.HOST
-def server(evt):
- serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- serv.settimeout(3)
- serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- global PORT
- PORT = test_support.bind_port(serv, "", PORT)
+def server(evt, serv):
serv.listen(5)
evt.set()
try:
def setUp(self):
self.evt = threading.Event()
- threading.Thread(target=server, args=(self.evt,)).start()
+ self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.sock.settimeout(3)
+ self.port = test_support.bind_port(self.sock)
+ threading.Thread(target=server, args=(self.evt,self.sock)).start()
self.evt.wait()
self.evt.clear()
time.sleep(.1)
def testBasic(self):
# connects
- telnet = telnetlib.Telnet("localhost", PORT)
+ telnet = telnetlib.Telnet(HOST, self.port)
telnet.sock.close()
def testTimeoutDefault(self):
# default
- telnet = telnetlib.Telnet("localhost", PORT)
+ telnet = telnetlib.Telnet(HOST, self.port)
self.assertTrue(telnet.sock.gettimeout() is None)
telnet.sock.close()
def testTimeoutValue(self):
# a value
- telnet = telnetlib.Telnet("localhost", PORT, timeout=30)
+ telnet = telnetlib.Telnet(HOST, self.port, timeout=30)
self.assertEqual(telnet.sock.gettimeout(), 30)
telnet.sock.close()
def testTimeoutDifferentOrder(self):
telnet = telnetlib.Telnet(timeout=30)
- telnet.open("localhost", PORT)
+ telnet.open(HOST, self.port)
self.assertEqual(telnet.sock.gettimeout(), 30)
telnet.sock.close()
previous = socket.getdefaulttimeout()
socket.setdefaulttimeout(30)
try:
- telnet = telnetlib.Telnet("localhost", PORT, timeout=None)
+ telnet = telnetlib.Telnet(HOST, self.port, timeout=None)
finally:
socket.setdefaulttimeout(previous)
self.assertEqual(telnet.sock.gettimeout(), 30)
Tests
-----
+- Issue #2550: The approach used by client/server code for obtaining ports
+ to listen on in network-oriented tests has been refined in an effort to
+ facilitate running multiple instances of the entire regression test suite
+ in parallel without issue. test_support.bind_port() has been fixed such
+ that it will always return a unique port -- which wasn't always the case
+ with the previous implementation, especially if socket options had been
+ set that affected address reuse (i.e. SO_REUSEADDR, SO_REUSEPORT). The
+ new implementation of bind_port() will actually raise an exception if it
+ is passed an AF_INET/SOCK_STREAM socket with either the SO_REUSEADDR or
+ SO_REUSEPORT socket option set. Furthermore, if available, bind_port()
+ will set the SO_EXCLUSIVEADDRUSE option on the socket it's been passed.
+ This currently only applies to Windows. This option prevents any other
+ sockets from binding to the host/port we've bound to, thus removing the
+ possibility of the 'non-deterministic' behaviour, as Microsoft puts it,
+ that occurs when a second SOCK_STREAM socket binds and accepts to a
+ host/port that's already been bound by another socket. The optional
+ preferred port parameter to bind_port() has been removed. Under no
+ circumstances should tests be hard coding ports!
+
+ test_support.find_unused_port() has also been introduced, which will pass
+ a temporary socket object to bind_port() in order to obtain an unused port.
+ The temporary socket object is then closed and deleted, and the port is
+ returned. This method should only be used for obtaining an unused port
+ in order to pass to an external program (i.e. the -accept [port] argument
+ to openssl's s_server mode) or as a parameter to a server-oriented class
+ that doesn't give you direct access to the underlying socket used.
+
+ Finally, test_support.HOST has been introduced, which should be used for
+ the host argument of any relevant socket calls (i.e. bind and connect).
+
+ The following tests were updated to following the new conventions:
+ test_socket, test_smtplib, test_asyncore, test_ssl, test_httplib,
+ test_poplib, test_ftplib, test_telnetlib, test_socketserver,
+ test_asynchat and test_socket_ssl.
+
+ It is now possible for multiple instances of the regression test suite to
+ run in parallel without issue.
Build
-----