import tempfile
import threading
import time
-import unittest
from unittest import mock
from http.server import HTTPServer
import time
import unittest
from unittest import mock
-from test.support import find_unused_port, IPV6_ENABLED
+from test.support import IPV6_ENABLED
import asyncio
from asyncio import base_events
import asyncio
-from asyncio import events
from asyncio import selector_events
from asyncio import test_utils
def test_subprocess_wait_no_same_group(self):
proto = None
- transp = None
@asyncio.coroutine
def connect():
nonlocal proto
# start the new process in a new session
- transp, proto = yield from self.loop.subprocess_shell(
+ _, proto = yield from self.loop.subprocess_shell(
functools.partial(MySubprocessProtocol, self.loop),
'exit 7', stdin=None, stdout=None, stderr=None,
start_new_session=True)
"""Tests for selector_events.py"""
-import collections
import errno
import gc
import pprint
def test_ssl_transport_requires_ssl_module(self):
Mock = mock.Mock
with self.assertRaises(RuntimeError):
- transport = _SelectorSslTransport(Mock(), Mock(), Mock(), Mock())
+ _SelectorSslTransport(Mock(), Mock(), Mock(), Mock())
class SelectorDatagramTransportTests(unittest.TestCase):
"""Tests for streams.py."""
-import functools
import gc
import socket
import unittest
v = yield from f
self.assertEqual(v, 'a')
- res = loop.run_until_complete(asyncio.Task(foo(), loop=loop))
+ loop.run_until_complete(asyncio.Task(foo(), loop=loop))
def test_as_completed_reverse_wait(self):
loop = test_utils.TestLoop(gen)
self.addCleanup(loop.close)
- sleepfut = None
-
@asyncio.coroutine
def sleep(dt):
- nonlocal sleepfut
- sleepfut = asyncio.sleep(dt, loop=loop)
- yield from sleepfut
+ yield from asyncio.sleep(dt, loop=loop)
@asyncio.coroutine
def doit():
m.waitpid.side_effect = ValueError
with mock.patch.object(log.logger,
- 'error') as m_error:
+ 'error') as m_error:
self.assertEqual(self.watcher._sig_chld(), None)
self.assertTrue(m_error.called)
# attach a new loop
old_loop = self.loop
self.loop = test_utils.TestLoop()
+ patch = mock.patch.object
- with mock.patch.object(
- old_loop,
- "remove_signal_handler") as m_old_remove_signal_handler, \
- mock.patch.object(
- self.loop,
- "add_signal_handler") as m_new_add_signal_handler:
+ with patch(old_loop, "remove_signal_handler") as m_old_remove, \
+ patch(self.loop, "add_signal_handler") as m_new_add:
self.watcher.attach_loop(self.loop)
- m_old_remove_signal_handler.assert_called_once_with(
+ m_old_remove.assert_called_once_with(
signal.SIGCHLD)
- m_new_add_signal_handler.assert_called_once_with(
+ m_new_add.assert_called_once_with(
signal.SIGCHLD, self.watcher._sig_chld)
# child terminates
def test_close(self, m):
# register two children
callback1 = mock.Mock()
- callback2 = mock.Mock()
with self.watcher:
self.running = True
import _winapi
import asyncio
-from asyncio import test_utils
from asyncio import _overlapped
from asyncio import windows_events
ADDRESS = r'\\.\pipe\test_double_bind-%s' % os.getpid()
server1 = windows_events.PipeServer(ADDRESS)
with self.assertRaises(PermissionError):
- server2 = windows_events.PipeServer(ADDRESS)
+ windows_events.PipeServer(ADDRESS)
server1.close()
def test_pipe(self):