import thread # If this fails, we can't test this module
import asyncore, asynchat, socket, threading, time
import unittest
+import sys
from test import test_support
HOST = "127.0.0.1"
def handle_connect(self):
pass
- ##print "Connected"
+
+ if sys.platform == 'darwin':
+ # select.poll returns a select.POLLHUP at the end of the tests
+ # on darwin, so just ignore it
+ def handle_expt(self):
+ pass
def collect_incoming_data(self, data):
self.buffer += data
c.push("world%s" % term)
c.push("I'm not dead yet!%s" % term)
c.push(SERVER_QUIT)
- asyncore.loop(use_poll=self.usepoll)#, count=5, timeout=5)
+ asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
s.join()
self.assertEqual(c.contents, ["hello world", "I'm not dead yet!"])
data = "hello world, I'm not dead yet!\n"
c.push(data)
c.push(SERVER_QUIT)
- asyncore.loop(use_poll=self.usepoll)#, count=5, timeout=5)
+ asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
s.join()
self.assertEqual(c.contents, [data[:termlen]])
data = "hello world, I'm not dead yet!\n"
c.push(data)
c.push(SERVER_QUIT)
- asyncore.loop(use_poll=self.usepoll)#, count=5, timeout=5)
+ asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
s.join()
self.assertEqual(c.contents, [])
data = "hello world\nI'm not dead yet!\n"
p = asynchat.simple_producer(data+SERVER_QUIT, buffer_size=8)
c.push_with_producer(p)
- asyncore.loop(use_poll=self.usepoll)#, count=5, timeout=5)
+ asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
s.join()
self.assertEqual(c.contents, ["hello world", "I'm not dead yet!"])
c = echo_client('\n')
data = "hello world\nI'm not dead yet!\n"
c.push_with_producer(data+SERVER_QUIT)
- asyncore.loop(use_poll=self.usepoll)#, count=5, timeout=5)
+ asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
s.join()
self.assertEqual(c.contents, ["hello world", "I'm not dead yet!"])
c = echo_client('\n')
c.push("hello world\n\nI'm not dead yet!\n")
c.push(SERVER_QUIT)
- asyncore.loop(use_poll=self.usepoll)
+ asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
s.join()
self.assertEqual(c.contents, ["hello world", "", "I'm not dead yet!"])
c.push("hello world\nI'm not dead yet!\n")
c.push(SERVER_QUIT)
c.close_when_done()
- asyncore.loop(use_poll=self.usepoll)#, count=5, timeout=5)
+ asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
s.join()
self.assertEqual(c.contents, [])