import tempfile
import threading
import time
+import unittest
from unittest import mock
from http.server import HTTPServer
if source is None:
raise ValueError("unable to get the source of %r" % (func,))
return source
+
+
+class TestCase(unittest.TestCase):
+ def set_event_loop(self, loop, *, cleanup=True):
+ assert loop is not None
+ # ensure that the event loop is passed explicitly in asyncio
+ events.set_event_loop(None)
+ if cleanup:
+ self.addCleanup(loop.close)
+
+ def new_test_loop(self, gen=None):
+ loop = TestLoop(gen)
+ self.set_event_loop(loop)
+ return loop
+
+ def tearDown(self):
+ events.set_event_loop(None)
PY34 = sys.version_info >= (3, 4)
-class BaseEventLoopTests(unittest.TestCase):
+class BaseEventLoopTests(test_utils.TestCase):
def setUp(self):
self.loop = base_events.BaseEventLoop()
self.loop._selector = mock.Mock()
- asyncio.set_event_loop(None)
+ self.set_event_loop(self.loop)
def test_not_implemented(self):
m = mock.Mock()
self.done.set_result(None)
-class BaseEventLoopWithSelectorTests(unittest.TestCase):
+class BaseEventLoopWithSelectorTests(test_utils.TestCase):
def setUp(self):
self.loop = asyncio.new_event_loop()
- asyncio.set_event_loop(None)
-
- def tearDown(self):
- self.loop.close()
+ self.set_event_loop(self.loop)
@mock.patch('asyncio.base_events.socket')
def test_create_connection_multiple_errors(self, m_socket):
def setUp(self):
super().setUp()
self.loop = self.create_event_loop()
- asyncio.set_event_loop(None)
+ self.set_event_loop(self.loop)
def tearDown(self):
# just in case if we have transport close callbacks
if sys.platform == 'win32':
- class SelectEventLoopTests(EventLoopTestsMixin, unittest.TestCase):
+ class SelectEventLoopTests(EventLoopTestsMixin, test_utils.TestCase):
def create_event_loop(self):
return asyncio.SelectorEventLoop()
class ProactorEventLoopTests(EventLoopTestsMixin,
SubprocessTestsMixin,
- unittest.TestCase):
+ test_utils.TestCase):
def create_event_loop(self):
return asyncio.ProactorEventLoop()
if hasattr(selectors, 'KqueueSelector'):
class KqueueEventLoopTests(UnixEventLoopTestsMixin,
SubprocessTestsMixin,
- unittest.TestCase):
+ test_utils.TestCase):
def create_event_loop(self):
return asyncio.SelectorEventLoop(
if hasattr(selectors, 'EpollSelector'):
class EPollEventLoopTests(UnixEventLoopTestsMixin,
SubprocessTestsMixin,
- unittest.TestCase):
+ test_utils.TestCase):
def create_event_loop(self):
return asyncio.SelectorEventLoop(selectors.EpollSelector())
if hasattr(selectors, 'PollSelector'):
class PollEventLoopTests(UnixEventLoopTestsMixin,
SubprocessTestsMixin,
- unittest.TestCase):
+ test_utils.TestCase):
def create_event_loop(self):
return asyncio.SelectorEventLoop(selectors.PollSelector())
# Should always exist.
class SelectEventLoopTests(UnixEventLoopTestsMixin,
SubprocessTestsMixin,
- unittest.TestCase):
+ test_utils.TestCase):
def create_event_loop(self):
return asyncio.SelectorEventLoop(selectors.SelectSelector())
return f
-class FutureTests(unittest.TestCase):
+class FutureTests(test_utils.TestCase):
def setUp(self):
- self.loop = test_utils.TestLoop()
- asyncio.set_event_loop(None)
-
- def tearDown(self):
- self.loop.close()
+ self.loop = self.new_test_loop()
def test_initial_state(self):
f = asyncio.Future(loop=self.loop)
self.assertTrue(f.cancelled())
def test_init_constructor_default_loop(self):
- try:
- asyncio.set_event_loop(self.loop)
- f = asyncio.Future()
- self.assertIs(f._loop, self.loop)
- finally:
- asyncio.set_event_loop(None)
+ asyncio.set_event_loop(self.loop)
+ f = asyncio.Future()
+ self.assertIs(f._loop, self.loop)
def test_constructor_positional(self):
# Make sure Future doesn't accept a positional argument
self.assertTrue(f2.cancelled())
-class FutureDoneCallbackTests(unittest.TestCase):
+class FutureDoneCallbackTests(test_utils.TestCase):
def setUp(self):
- self.loop = test_utils.TestLoop()
- asyncio.set_event_loop(None)
-
- def tearDown(self):
- self.loop.close()
+ self.loop = self.new_test_loop()
def run_briefly(self):
test_utils.run_briefly(self.loop)
RGX_REPR = re.compile(STR_RGX_REPR)
-class LockTests(unittest.TestCase):
+class LockTests(test_utils.TestCase):
def setUp(self):
- self.loop = test_utils.TestLoop()
- asyncio.set_event_loop(None)
-
- def tearDown(self):
- self.loop.close()
+ self.loop = self.new_test_loop()
def test_ctor_loop(self):
loop = mock.Mock()
self.assertIs(lock._loop, self.loop)
def test_ctor_noloop(self):
- try:
- asyncio.set_event_loop(self.loop)
- lock = asyncio.Lock()
- self.assertIs(lock._loop, self.loop)
- finally:
- asyncio.set_event_loop(None)
+ asyncio.set_event_loop(self.loop)
+ lock = asyncio.Lock()
+ self.assertIs(lock._loop, self.loop)
def test_repr(self):
lock = asyncio.Lock(loop=self.loop)
self.assertFalse(lock.locked())
-class EventTests(unittest.TestCase):
+class EventTests(test_utils.TestCase):
def setUp(self):
- self.loop = test_utils.TestLoop()
- asyncio.set_event_loop(None)
-
- def tearDown(self):
- self.loop.close()
+ self.loop = self.new_test_loop()
def test_ctor_loop(self):
loop = mock.Mock()
self.assertIs(ev._loop, self.loop)
def test_ctor_noloop(self):
- try:
- asyncio.set_event_loop(self.loop)
- ev = asyncio.Event()
- self.assertIs(ev._loop, self.loop)
- finally:
- asyncio.set_event_loop(None)
+ asyncio.set_event_loop(self.loop)
+ ev = asyncio.Event()
+ self.assertIs(ev._loop, self.loop)
def test_repr(self):
ev = asyncio.Event(loop=self.loop)
self.assertTrue(t.result())
-class ConditionTests(unittest.TestCase):
+class ConditionTests(test_utils.TestCase):
def setUp(self):
- self.loop = test_utils.TestLoop()
- asyncio.set_event_loop(None)
-
- def tearDown(self):
- self.loop.close()
+ self.loop = self.new_test_loop()
def test_ctor_loop(self):
loop = mock.Mock()
self.assertIs(cond._loop, self.loop)
def test_ctor_noloop(self):
- try:
- asyncio.set_event_loop(self.loop)
- cond = asyncio.Condition()
- self.assertIs(cond._loop, self.loop)
- finally:
- asyncio.set_event_loop(None)
+ asyncio.set_event_loop(self.loop)
+ cond = asyncio.Condition()
+ self.assertIs(cond._loop, self.loop)
def test_wait(self):
cond = asyncio.Condition(loop=self.loop)
self.assertFalse(cond.locked())
-class SemaphoreTests(unittest.TestCase):
+class SemaphoreTests(test_utils.TestCase):
def setUp(self):
- self.loop = test_utils.TestLoop()
- asyncio.set_event_loop(None)
-
- def tearDown(self):
- self.loop.close()
+ self.loop = self.new_test_loop()
def test_ctor_loop(self):
loop = mock.Mock()
self.assertIs(sem._loop, self.loop)
def test_ctor_noloop(self):
- try:
- asyncio.set_event_loop(self.loop)
- sem = asyncio.Semaphore()
- self.assertIs(sem._loop, self.loop)
- finally:
- asyncio.set_event_loop(None)
+ asyncio.set_event_loop(self.loop)
+ sem = asyncio.Semaphore()
+ self.assertIs(sem._loop, self.loop)
def test_initial_value_zero(self):
sem = asyncio.Semaphore(0, loop=self.loop)
from asyncio import test_utils
-class ProactorSocketTransportTests(unittest.TestCase):
+class ProactorSocketTransportTests(test_utils.TestCase):
def setUp(self):
- self.loop = test_utils.TestLoop()
+ self.loop = self.new_test_loop()
self.proactor = mock.Mock()
self.loop._proactor = self.proactor
self.protocol = test_utils.make_test_protocol(asyncio.Protocol)
tr.close()
-class BaseProactorEventLoopTests(unittest.TestCase):
+class BaseProactorEventLoopTests(test_utils.TestCase):
def setUp(self):
self.sock = mock.Mock(socket.socket)
return (self.ssock, self.csock)
self.loop = EventLoop(self.proactor)
+ self.set_event_loop(self.loop, cleanup=False)
@mock.patch.object(BaseProactorEventLoop, 'call_soon')
@mock.patch.object(BaseProactorEventLoop, '_socketpair')
from asyncio import test_utils
-class _QueueTestBase(unittest.TestCase):
+class _QueueTestBase(test_utils.TestCase):
def setUp(self):
- self.loop = test_utils.TestLoop()
- asyncio.set_event_loop(None)
-
- def tearDown(self):
- self.loop.close()
+ self.loop = self.new_test_loop()
class QueueBasicTests(_QueueTestBase):
self.assertAlmostEqual(0.2, when)
yield 0.1
- loop = test_utils.TestLoop(gen)
- self.addCleanup(loop.close)
+ loop = self.new_test_loop(gen)
q = asyncio.Queue(loop=loop)
self.assertTrue(fn(q).startswith('<Queue'), fn(q))
self.assertIs(q._loop, self.loop)
def test_ctor_noloop(self):
- try:
- asyncio.set_event_loop(self.loop)
- q = asyncio.Queue()
- self.assertIs(q._loop, self.loop)
- finally:
- asyncio.set_event_loop(None)
+ asyncio.set_event_loop(self.loop)
+ q = asyncio.Queue()
+ self.assertIs(q._loop, self.loop)
def test_repr(self):
self._test_repr_or_str(repr, True)
self.assertAlmostEqual(0.02, when)
yield 0.01
- loop = test_utils.TestLoop(gen)
- self.addCleanup(loop.close)
+ loop = self.new_test_loop(gen)
q = asyncio.Queue(maxsize=2, loop=loop)
self.assertEqual(2, q.maxsize)
self.assertAlmostEqual(0.01, when)
yield 0.01
- loop = test_utils.TestLoop(gen)
- self.addCleanup(loop.close)
+ loop = self.new_test_loop(gen)
q = asyncio.Queue(loop=loop)
started = asyncio.Event(loop=loop)
self.assertAlmostEqual(0.061, when)
yield 0.05
- loop = test_utils.TestLoop(gen)
- self.addCleanup(loop.close)
+ loop = self.new_test_loop(gen)
q = asyncio.Queue(loop=loop)
self.assertAlmostEqual(0.01, when)
yield 0.01
- loop = test_utils.TestLoop(gen)
- self.addCleanup(loop.close)
+ loop = self.new_test_loop(gen)
q = asyncio.Queue(maxsize=1, loop=loop)
started = asyncio.Event(loop=loop)
return bytearray().join(l)
-class BaseSelectorEventLoopTests(unittest.TestCase):
+class BaseSelectorEventLoopTests(test_utils.TestCase):
def setUp(self):
selector = mock.Mock()
self.loop = TestBaseSelectorEventLoop(selector)
+ self.set_event_loop(self.loop, cleanup=False)
def test_make_socket_transport(self):
m = mock.Mock()
self.loop.remove_writer.assert_called_with(1)
-class SelectorTransportTests(unittest.TestCase):
+class SelectorTransportTests(test_utils.TestCase):
def setUp(self):
- self.loop = test_utils.TestLoop()
+ self.loop = self.new_test_loop()
self.protocol = test_utils.make_test_protocol(asyncio.Protocol)
self.sock = mock.Mock(socket.socket)
self.sock.fileno.return_value = 7
self.assertEqual(2, sys.getrefcount(self.protocol),
pprint.pformat(gc.get_referrers(self.protocol)))
self.assertIsNone(tr._loop)
- self.assertEqual(2, sys.getrefcount(self.loop),
+ self.assertEqual(3, sys.getrefcount(self.loop),
pprint.pformat(gc.get_referrers(self.loop)))
-class SelectorSocketTransportTests(unittest.TestCase):
+class SelectorSocketTransportTests(test_utils.TestCase):
def setUp(self):
- self.loop = test_utils.TestLoop()
+ self.loop = self.new_test_loop()
self.protocol = test_utils.make_test_protocol(asyncio.Protocol)
self.sock = mock.Mock(socket.socket)
self.sock_fd = self.sock.fileno.return_value = 7
@unittest.skipIf(ssl is None, 'No ssl module')
-class SelectorSslTransportTests(unittest.TestCase):
+class SelectorSslTransportTests(test_utils.TestCase):
def setUp(self):
- self.loop = test_utils.TestLoop()
+ self.loop = self.new_test_loop()
self.protocol = test_utils.make_test_protocol(asyncio.Protocol)
self.sock = mock.Mock(socket.socket)
self.sock.fileno.return_value = 7
_SelectorSslTransport(Mock(), Mock(), Mock(), Mock())
-class SelectorDatagramTransportTests(unittest.TestCase):
+class SelectorDatagramTransportTests(test_utils.TestCase):
def setUp(self):
- self.loop = test_utils.TestLoop()
+ self.loop = self.new_test_loop()
self.protocol = test_utils.make_test_protocol(asyncio.DatagramProtocol)
self.sock = mock.Mock(spec_set=socket.socket)
self.sock.fileno.return_value = 7
from asyncio import test_utils
-class StreamReaderTests(unittest.TestCase):
+class StreamReaderTests(test_utils.TestCase):
DATA = b'line1\nline2\nline3\n'
def setUp(self):
self.loop = asyncio.new_event_loop()
- asyncio.set_event_loop(None)
+ self.set_event_loop(self.loop)
def tearDown(self):
# just in case if we have transport close callbacks
self.loop.close()
gc.collect()
+ super().tearDown()
@mock.patch('asyncio.streams.events')
def test_ctor_global_loop(self, m_events):
from asyncio import subprocess
+from asyncio import test_utils
import asyncio
import signal
import sys
policy = asyncio.get_event_loop_policy()
policy.set_child_watcher(None)
self.loop.close()
- policy.set_event_loop(None)
+ super().tearDown()
class SubprocessSafeWatcherTests(SubprocessWatcherMixin,
- unittest.TestCase):
+ test_utils.TestCase):
Watcher = unix_events.SafeChildWatcher
class SubprocessFastWatcherTests(SubprocessWatcherMixin,
- unittest.TestCase):
+ test_utils.TestCase):
Watcher = unix_events.FastChildWatcher
else:
# Windows
- class SubprocessProactorTests(SubprocessMixin, unittest.TestCase):
+ class SubprocessProactorTests(SubprocessMixin, test_utils.TestCase):
def setUp(self):
policy = asyncio.get_event_loop_policy()
policy = asyncio.get_event_loop_policy()
self.loop.close()
policy.set_event_loop(None)
+ super().tearDown()
if __name__ == '__main__':
pass
-class TaskTests(unittest.TestCase):
+class TaskTests(test_utils.TestCase):
def setUp(self):
- self.loop = test_utils.TestLoop()
- asyncio.set_event_loop(None)
-
- def tearDown(self):
- self.loop.close()
- gc.collect()
+ self.loop = self.new_test_loop()
def test_task_class(self):
@asyncio.coroutine
self.assertIs(t._loop, self.loop)
loop = asyncio.new_event_loop()
+ self.set_event_loop(loop)
t = asyncio.Task(notmuch(), loop=loop)
self.assertIs(t._loop, loop)
loop.close()
self.assertIs(t._loop, self.loop)
loop = asyncio.new_event_loop()
+ self.set_event_loop(loop)
t = asyncio.async(notmuch(), loop=loop)
self.assertIs(t._loop, loop)
loop.close()
self.assertIs(f, f_orig)
loop = asyncio.new_event_loop()
+ self.set_event_loop(loop)
with self.assertRaises(ValueError):
f = asyncio.async(f_orig, loop=loop)
self.assertIs(t, t_orig)
loop = asyncio.new_event_loop()
+ self.set_event_loop(loop)
with self.assertRaises(ValueError):
t = asyncio.async(t_orig, loop=loop)
self.assertAlmostEqual(10.0, when)
yield 0
- loop = test_utils.TestLoop(gen)
- self.addCleanup(loop.close)
+ loop = self.new_test_loop(gen)
@asyncio.coroutine
def task():
def test_cancel_current_task(self):
loop = asyncio.new_event_loop()
- self.addCleanup(loop.close)
+ self.set_event_loop(loop)
@asyncio.coroutine
def task():
self.assertAlmostEqual(0.3, when)
yield 0.1
- loop = test_utils.TestLoop(gen)
- self.addCleanup(loop.close)
+ loop = self.new_test_loop(gen)
x = 0
waiters = []
self.assertAlmostEqual(0.1, when)
when = yield 0.1
- loop = test_utils.TestLoop(gen)
- self.addCleanup(loop.close)
+ loop = self.new_test_loop(gen)
foo_running = None
self.assertEqual(foo_running, False)
def test_wait_for_blocking(self):
- loop = test_utils.TestLoop()
- self.addCleanup(loop.close)
+ loop = self.new_test_loop()
@asyncio.coroutine
def coro():
self.assertAlmostEqual(0.01, when)
yield 0.01
- loop = test_utils.TestLoop(gen)
- self.addCleanup(loop.close)
+ loop = self.new_test_loop(gen)
@asyncio.coroutine
def foo():
self.assertAlmostEqual(0.15, when)
yield 0.15
- loop = test_utils.TestLoop(gen)
- self.addCleanup(loop.close)
+ loop = self.new_test_loop(gen)
a = asyncio.Task(asyncio.sleep(0.1, loop=loop), loop=loop)
b = asyncio.Task(asyncio.sleep(0.15, loop=loop), loop=loop)
self.assertAlmostEqual(0.015, when)
yield 0.015
- loop = test_utils.TestLoop(gen)
- self.addCleanup(loop.close)
+ loop = self.new_test_loop(gen)
a = asyncio.Task(asyncio.sleep(0.01, loop=loop), loop=loop)
b = asyncio.Task(asyncio.sleep(0.015, loop=loop), loop=loop)
return 42
asyncio.set_event_loop(loop)
- try:
- res = loop.run_until_complete(
- asyncio.Task(foo(), loop=loop))
- finally:
- asyncio.set_event_loop(None)
+ res = loop.run_until_complete(
+ asyncio.Task(foo(), loop=loop))
self.assertEqual(res, 42)
self.assertAlmostEqual(0.1, when)
yield 0.1
- loop = test_utils.TestLoop(gen)
- self.addCleanup(loop.close)
+ loop = self.new_test_loop(gen)
a = asyncio.Task(asyncio.sleep(10.0, loop=loop), loop=loop)
b = asyncio.Task(asyncio.sleep(0.1, loop=loop), loop=loop)
self.assertAlmostEqual(10.0, when)
yield 0
- loop = test_utils.TestLoop(gen)
- self.addCleanup(loop.close)
+ loop = self.new_test_loop(gen)
# first_exception, task already has exception
a = asyncio.Task(asyncio.sleep(10.0, loop=loop), loop=loop)
self.assertAlmostEqual(0.01, when)
yield 0.01
- loop = test_utils.TestLoop(gen)
- self.addCleanup(loop.close)
+ loop = self.new_test_loop(gen)
# first_exception, exception during waiting
a = asyncio.Task(asyncio.sleep(10.0, loop=loop), loop=loop)
self.assertAlmostEqual(0.15, when)
yield 0.15
- loop = test_utils.TestLoop(gen)
- self.addCleanup(loop.close)
+ loop = self.new_test_loop(gen)
a = asyncio.Task(asyncio.sleep(0.1, loop=loop), loop=loop)
self.assertAlmostEqual(0.11, when)
yield 0.11
- loop = test_utils.TestLoop(gen)
- self.addCleanup(loop.close)
+ loop = self.new_test_loop(gen)
a = asyncio.Task(asyncio.sleep(0.1, loop=loop), loop=loop)
b = asyncio.Task(asyncio.sleep(0.15, loop=loop), loop=loop)
self.assertAlmostEqual(0.1, when)
yield 0.1
- loop = test_utils.TestLoop(gen)
- self.addCleanup(loop.close)
+ loop = self.new_test_loop(gen)
a = asyncio.Task(asyncio.sleep(0.1, loop=loop), loop=loop)
b = asyncio.Task(asyncio.sleep(0.15, loop=loop), loop=loop)
yield 0.01
yield 0
- loop = test_utils.TestLoop(gen)
- self.addCleanup(loop.close)
+ loop = self.new_test_loop(gen)
completed = set()
time_shifted = False
yield 0
yield 0.1
- loop = test_utils.TestLoop(gen)
- self.addCleanup(loop.close)
+ loop = self.new_test_loop(gen)
a = asyncio.sleep(0.1, 'a', loop=loop)
b = asyncio.sleep(0.15, 'b', loop=loop)
yield 0
yield 0.01
- loop = test_utils.TestLoop(gen)
- self.addCleanup(loop.close)
+ loop = self.new_test_loop(gen)
a = asyncio.sleep(0.01, 'a', loop=loop)
yield 0.05
yield 0
- loop = test_utils.TestLoop(gen)
- self.addCleanup(loop.close)
+ loop = self.new_test_loop(gen)
a = asyncio.sleep(0.05, 'a', loop=loop)
b = asyncio.sleep(0.10, 'b', loop=loop)
self.assertAlmostEqual(0.05, when)
yield 0.05
- loop = test_utils.TestLoop(gen)
- self.addCleanup(loop.close)
+ loop = self.new_test_loop(gen)
a = asyncio.sleep(0.05, 'a', loop=loop)
b = asyncio.sleep(0.05, 'b', loop=loop)
self.assertAlmostEqual(0.1, when)
yield 0.05
- loop = test_utils.TestLoop(gen)
- self.addCleanup(loop.close)
+ loop = self.new_test_loop(gen)
@asyncio.coroutine
def sleeper(dt, arg):
self.assertAlmostEqual(10.0, when)
yield 0
- loop = test_utils.TestLoop(gen)
- self.addCleanup(loop.close)
+ loop = self.new_test_loop(gen)
t = asyncio.Task(asyncio.sleep(10.0, 'yeah', loop=loop),
loop=loop)
self.assertAlmostEqual(5000, when)
yield 0.1
- loop = test_utils.TestLoop(gen)
- self.addCleanup(loop.close)
+ loop = self.new_test_loop(gen)
@asyncio.coroutine
def sleep(dt):
self.assertAlmostEqual(10.0, when)
yield 0
- loop = test_utils.TestLoop(gen)
- self.addCleanup(loop.close)
+ loop = self.new_test_loop(gen)
@asyncio.coroutine
def sleeper():
class GatherTestsBase:
def setUp(self):
- self.one_loop = test_utils.TestLoop()
- self.other_loop = test_utils.TestLoop()
-
- def tearDown(self):
- self.one_loop.close()
- self.other_loop.close()
+ self.one_loop = self.new_test_loop()
+ self.other_loop = self.new_test_loop()
+ self.set_event_loop(self.one_loop, cleanup=False)
def _run_loop(self, loop):
while loop._ready:
self.assertEqual(stdout.rstrip(), b'False')
-class FutureGatherTests(GatherTestsBase, unittest.TestCase):
+class FutureGatherTests(GatherTestsBase, test_utils.TestCase):
def wrap_futures(self, *futures):
return futures
cb.assert_called_once_with(fut)
-class CoroutineGatherTests(GatherTestsBase, unittest.TestCase):
+class CoroutineGatherTests(GatherTestsBase, test_utils.TestCase):
def setUp(self):
super().setUp()
asyncio.set_event_loop(self.one_loop)
- def tearDown(self):
- asyncio.set_event_loop(None)
- super().tearDown()
-
def wrap_futures(self, *futures):
coros = []
for fut in futures:
@unittest.skipUnless(signal, 'Signals are not supported')
-class SelectorEventLoopSignalTests(unittest.TestCase):
+class SelectorEventLoopSignalTests(test_utils.TestCase):
def setUp(self):
self.loop = asyncio.SelectorEventLoop()
- asyncio.set_event_loop(None)
-
- def tearDown(self):
- self.loop.close()
+ self.set_event_loop(self.loop)
def test_check_signal(self):
self.assertRaises(
@unittest.skipUnless(hasattr(socket, 'AF_UNIX'),
'UNIX Sockets are not supported')
-class SelectorEventLoopUnixSocketTests(unittest.TestCase):
+class SelectorEventLoopUnixSocketTests(test_utils.TestCase):
def setUp(self):
self.loop = asyncio.SelectorEventLoop()
- asyncio.set_event_loop(None)
-
- def tearDown(self):
- self.loop.close()
+ self.set_event_loop(self.loop)
def test_create_unix_server_existing_path_sock(self):
with test_utils.unix_socket_path() as path:
self.loop.run_until_complete(coro)
-class UnixReadPipeTransportTests(unittest.TestCase):
+class UnixReadPipeTransportTests(test_utils.TestCase):
def setUp(self):
- self.loop = test_utils.TestLoop()
+ self.loop = self.new_test_loop()
self.protocol = test_utils.make_test_protocol(asyncio.Protocol)
self.pipe = mock.Mock(spec_set=io.RawIOBase)
self.pipe.fileno.return_value = 5
self.assertEqual(2, sys.getrefcount(self.protocol),
pprint.pformat(gc.get_referrers(self.protocol)))
self.assertIsNone(tr._loop)
- self.assertEqual(4, sys.getrefcount(self.loop),
+ self.assertEqual(5, sys.getrefcount(self.loop),
pprint.pformat(gc.get_referrers(self.loop)))
def test__call_connection_lost_with_err(self):
self.assertEqual(2, sys.getrefcount(self.protocol),
pprint.pformat(gc.get_referrers(self.protocol)))
self.assertIsNone(tr._loop)
- self.assertEqual(4, sys.getrefcount(self.loop),
+ self.assertEqual(5, sys.getrefcount(self.loop),
pprint.pformat(gc.get_referrers(self.loop)))
-class UnixWritePipeTransportTests(unittest.TestCase):
+class UnixWritePipeTransportTests(test_utils.TestCase):
def setUp(self):
- self.loop = test_utils.TestLoop()
+ self.loop = self.new_test_loop()
self.protocol = test_utils.make_test_protocol(asyncio.BaseProtocol)
self.pipe = mock.Mock(spec_set=io.RawIOBase)
self.pipe.fileno.return_value = 5
self.assertEqual(2, sys.getrefcount(self.protocol),
pprint.pformat(gc.get_referrers(self.protocol)))
self.assertIsNone(tr._loop)
- self.assertEqual(4, sys.getrefcount(self.loop),
+ self.assertEqual(5, sys.getrefcount(self.loop),
pprint.pformat(gc.get_referrers(self.loop)))
def test__call_connection_lost_with_err(self):
self.assertEqual(2, sys.getrefcount(self.protocol),
pprint.pformat(gc.get_referrers(self.protocol)))
self.assertIsNone(tr._loop)
- self.assertEqual(4, sys.getrefcount(self.loop),
+ self.assertEqual(5, sys.getrefcount(self.loop),
pprint.pformat(gc.get_referrers(self.loop)))
def test_close(self):
ignore_warnings = mock.patch.object(log.logger, "warning")
def setUp(self):
- self.loop = test_utils.TestLoop()
+ self.loop = self.new_test_loop()
self.running = False
self.zombies = {}
# attach a new loop
old_loop = self.loop
- self.loop = test_utils.TestLoop()
+ self.loop = self.new_test_loop()
patch = mock.patch.object
with patch(old_loop, "remove_signal_handler") as m_old_remove, \
self.assertFalse(callback3.called)
# attach a new loop
- self.loop = test_utils.TestLoop()
+ self.loop = self.new_test_loop()
with mock.patch.object(
self.loop, "add_signal_handler") as m_add_signal_handler:
self.assertFalse(self.watcher._zombies)
-class SafeChildWatcherTests (ChildWatcherTestsMixin, unittest.TestCase):
+class SafeChildWatcherTests (ChildWatcherTestsMixin, test_utils.TestCase):
def create_watcher(self):
return asyncio.SafeChildWatcher()
-class FastChildWatcherTests (ChildWatcherTestsMixin, unittest.TestCase):
+class FastChildWatcherTests (ChildWatcherTestsMixin, test_utils.TestCase):
def create_watcher(self):
return asyncio.FastChildWatcher()
self.trans.close()
-class ProactorTests(unittest.TestCase):
+class ProactorTests(test_utils.TestCase):
def setUp(self):
self.loop = asyncio.ProactorEventLoop()
- asyncio.set_event_loop(None)
-
- def tearDown(self):
- self.loop.close()
- self.loop = None
+ self.set_event_loop(self.loop)
def test_close(self):
a, b = self.loop._socketpair()