from .protocols import *
from .queues import *
from .streams import *
+from .subprocess import *
from .tasks import *
from .transports import *
protocols.__all__ +
queues.__all__ +
streams.__all__ +
+ subprocess.__all__ +
tasks.__all__ +
transports.__all__)
from . import transports
-STDIN = 0
-STDOUT = 1
-STDERR = 2
-
-
class BaseSubprocessTransport(transports.SubprocessTransport):
def __init__(self, loop, protocol, args, shell,
self._pipes = {}
if stdin == subprocess.PIPE:
- self._pipes[STDIN] = None
+ self._pipes[0] = None
if stdout == subprocess.PIPE:
- self._pipes[STDOUT] = None
+ self._pipes[1] = None
if stderr == subprocess.PIPE:
- self._pipes[STDERR] = None
+ self._pipes[2] = None
self._pending_calls = collections.deque()
self._finished = False
self._returncode = None
loop = self._loop
if proc.stdin is not None:
_, pipe = yield from loop.connect_write_pipe(
- lambda: WriteSubprocessPipeProto(self, STDIN),
+ lambda: WriteSubprocessPipeProto(self, 0),
proc.stdin)
- self._pipes[STDIN] = pipe
+ self._pipes[0] = pipe
if proc.stdout is not None:
_, pipe = yield from loop.connect_read_pipe(
- lambda: ReadSubprocessPipeProto(self, STDOUT),
+ lambda: ReadSubprocessPipeProto(self, 1),
proc.stdout)
- self._pipes[STDOUT] = pipe
+ self._pipes[1] = pipe
if proc.stderr is not None:
_, pipe = yield from loop.connect_read_pipe(
- lambda: ReadSubprocessPipeProto(self, STDERR),
+ lambda: ReadSubprocessPipeProto(self, 2),
proc.stderr)
- self._pipes[STDERR] = pipe
+ self._pipes[2] = pipe
assert self._pending_calls is not None
self._buffer = None # None or bytearray.
self._read_fut = None
self._write_fut = None
+ self._pending_write = 0
self._conn_lost = 0
self._closing = False # Set when close() called.
self._eof_written = False
if self._read_fut:
self._read_fut.cancel()
self._write_fut = self._read_fut = None
+ self._pending_write = 0
self._buffer = None
self._loop.call_soon(self._call_connection_lost, exc)
self._low_water = low
def get_write_buffer_size(self):
- # NOTE: This doesn't take into account data already passed to
- # send() even if send() hasn't finished yet.
- if not self._buffer:
- return 0
- return len(self._buffer)
+ size = self._pending_write
+ if self._buffer is not None:
+ size += len(self._buffer)
+ return size
class _ProactorReadPipeTransport(_ProactorBasePipeTransport,
class _ProactorBaseWritePipeTransport(_ProactorBasePipeTransport,
- transports.WriteTransport):
+ transports.WriteTransport):
"""Transport for write pipes."""
def write(self, data):
try:
assert f is self._write_fut
self._write_fut = None
+ self._pending_write = 0
if f:
f.result()
if data is None:
self._loop.call_soon(self._call_connection_lost, None)
if self._eof_written:
self._sock.shutdown(socket.SHUT_WR)
+ # Now that we've reduced the buffer size, tell the
+ # protocol to resume writing if it was paused. Note that
+ # we do this last since the callback is called immediately
+ # and it may add more data to the buffer (even causing the
+ # protocol to be paused again).
+ self._maybe_resume_protocol()
else:
self._write_fut = self._loop._proactor.send(self._sock, data)
- self._write_fut.add_done_callback(self._loop_writing)
- # Now that we've reduced the buffer size, tell the
- # protocol to resume writing if it was paused. Note that
- # we do this last since the callback is called immediately
- # and it may add more data to the buffer (even causing the
- # protocol to be paused again).
- self._maybe_resume_protocol()
+ if not self._write_fut.done():
+ assert self._pending_write == 0
+ self._pending_write = len(data)
+ self._write_fut.add_done_callback(self._loop_writing)
+ self._maybe_pause_protocol()
+ else:
+ self._write_fut.add_done_callback(self._loop_writing)
except ConnectionResetError as exc:
self._force_close(exc)
except OSError as exc:
--- /dev/null
+__all__ = ['create_subprocess_exec', 'create_subprocess_shell']
+
+import collections
+import subprocess
+
+from . import events
+from . import futures
+from . import protocols
+from . import streams
+from . import tasks
+
+
+PIPE = subprocess.PIPE
+STDOUT = subprocess.STDOUT
+DEVNULL = subprocess.DEVNULL
+
+
+class SubprocessStreamProtocol(streams.FlowControlMixin,
+ protocols.SubprocessProtocol):
+ """Like StreamReaderProtocol, but for a subprocess."""
+
+ def __init__(self, limit, loop):
+ super().__init__(loop=loop)
+ self._limit = limit
+ self.stdin = self.stdout = self.stderr = None
+ self.waiter = futures.Future(loop=loop)
+ self._waiters = collections.deque()
+ self._transport = None
+
+ def connection_made(self, transport):
+ self._transport = transport
+ if transport.get_pipe_transport(1):
+ self.stdout = streams.StreamReader(limit=self._limit,
+ loop=self._loop)
+ if transport.get_pipe_transport(2):
+ self.stderr = streams.StreamReader(limit=self._limit,
+ loop=self._loop)
+ stdin = transport.get_pipe_transport(0)
+ if stdin is not None:
+ self.stdin = streams.StreamWriter(stdin,
+ protocol=self,
+ reader=None,
+ loop=self._loop)
+ self.waiter.set_result(None)
+
+ def pipe_data_received(self, fd, data):
+ if fd == 1:
+ reader = self.stdout
+ elif fd == 2:
+ reader = self.stderr
+ else:
+ reader = None
+ if reader is not None:
+ reader.feed_data(data)
+
+ def pipe_connection_lost(self, fd, exc):
+ if fd == 0:
+ pipe = self.stdin
+ if pipe is not None:
+ pipe.close()
+ self.connection_lost(exc)
+ return
+ if fd == 1:
+ reader = self.stdout
+ elif fd == 2:
+ reader = self.stderr
+ else:
+ reader = None
+ if reader != None:
+ if exc is None:
+ reader.feed_eof()
+ else:
+ reader.set_exception(exc)
+
+ def process_exited(self):
+ # wake up futures waiting for wait()
+ returncode = self._transport.get_returncode()
+ while self._waiters:
+ waiter = self._waiters.popleft()
+ waiter.set_result(returncode)
+
+
+class Process:
+ def __init__(self, transport, protocol, loop):
+ self._transport = transport
+ self._protocol = protocol
+ self._loop = loop
+ self.stdin = protocol.stdin
+ self.stdout = protocol.stdout
+ self.stderr = protocol.stderr
+ self.pid = transport.get_pid()
+
+ @property
+ def returncode(self):
+ return self._transport.get_returncode()
+
+ @tasks.coroutine
+ def wait(self):
+ """Wait until the process exit and return the process return code."""
+ returncode = self._transport.get_returncode()
+ if returncode is not None:
+ return returncode
+
+ waiter = futures.Future(loop=self._loop)
+ self._protocol._waiters.append(waiter)
+ yield from waiter
+ return waiter.result()
+
+ def get_subprocess(self):
+ return self._transport.get_extra_info('subprocess')
+
+ def _check_alive(self):
+ if self._transport.get_returncode() is not None:
+ raise ProcessLookupError()
+
+ def send_signal(self, signal):
+ self._check_alive()
+ self._transport.send_signal(signal)
+
+ def terminate(self):
+ self._check_alive()
+ self._transport.terminate()
+
+ def kill(self):
+ self._check_alive()
+ self._transport.kill()
+
+ @tasks.coroutine
+ def _feed_stdin(self, input):
+ self.stdin.write(input)
+ yield from self.stdin.drain()
+ self.stdin.close()
+
+ @tasks.coroutine
+ def _noop(self):
+ return None
+
+ @tasks.coroutine
+ def _read_stream(self, fd):
+ transport = self._transport.get_pipe_transport(fd)
+ if fd == 2:
+ stream = self.stderr
+ else:
+ assert fd == 1
+ stream = self.stdout
+ output = yield from stream.read()
+ transport.close()
+ return output
+
+ @tasks.coroutine
+ def communicate(self, input=None):
+ loop = self._transport._loop
+ if input:
+ stdin = self._feed_stdin(input)
+ else:
+ stdin = self._noop()
+ if self.stdout is not None:
+ stdout = self._read_stream(1)
+ else:
+ stdout = self._noop()
+ if self.stderr is not None:
+ stderr = self._read_stream(2)
+ else:
+ stderr = self._noop()
+ stdin, stdout, stderr = yield from tasks.gather(stdin, stdout, stderr,
+ loop=loop)
+ yield from self.wait()
+ return (stdout, stderr)
+
+
+@tasks.coroutine
+def create_subprocess_shell(cmd, stdin=None, stdout=None, stderr=None,
+ loop=None, limit=streams._DEFAULT_LIMIT, **kwds):
+ if loop is None:
+ loop = events.get_event_loop()
+ protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
+ loop=loop)
+ transport, protocol = yield from loop.subprocess_shell(
+ protocol_factory,
+ cmd, stdin=stdin, stdout=stdout,
+ stderr=stderr, **kwds)
+ yield from protocol.waiter
+ return Process(transport, protocol, loop)
+
+@tasks.coroutine
+def create_subprocess_exec(*args, stdin=None, stdout=None, stderr=None,
+ loop=None, limit=streams._DEFAULT_LIMIT, **kwds):
+ if loop is None:
+ loop = events.get_event_loop()
+ protocol_factory = lambda: SubprocessStreamProtocol(limit=limit,
+ loop=loop)
+ transport, protocol = yield from loop.subprocess_exec(
+ protocol_factory,
+ *args, stdin=stdin, stdout=stdout,
+ stderr=stderr, **kwds)
+ yield from protocol.waiter
+ return Process(transport, protocol, loop)
from .log import logger
-__all__ = ['SelectorEventLoop', 'STDIN', 'STDOUT', 'STDERR',
+__all__ = ['SelectorEventLoop',
'AbstractChildWatcher', 'SafeChildWatcher',
'FastChildWatcher', 'DefaultEventLoopPolicy',
]
-STDIN = 0
-STDOUT = 1
-STDERR = 2
-
-
if sys.platform == 'win32': # pragma: no cover
raise ImportError('Signals are not really supported on Windows')
self.loop.stop()
self.loop._process_events = unittest.mock.Mock()
- delay = 0.1
-
- when = self.loop.time() + delay
+ when = self.loop.time() + 0.1
self.loop.call_at(when, cb)
t0 = self.loop.time()
self.loop.run_forever()
dt = self.loop.time() - t0
-
- self.assertGreaterEqual(dt, delay - self.loop._granularity, dt)
- # tolerate a difference of +800 ms because some Python buildbots
- # are really slow
- self.assertLessEqual(dt, 0.9, dt)
+ self.assertTrue(0.09 <= dt <= 0.9,
+ # Issue #20452: add more info in case of failure,
+ # to try to investigate the bug
+ (dt,
+ self.loop._granularity,
+ time.get_clock_info('monotonic')))
def test_run_once_in_executor_handle(self):
def cb():
calls.append(self.loop._run_once_counter)
self.assertEqual(calls, [1, 3, 5, 6])
- def test_granularity(self):
- granularity = self.loop._granularity
- self.assertGreater(granularity, 0.0)
- # Worst expected granularity: 1 ms on Linux (limited by poll/epoll
- # resolution), 15.6 ms on Windows (limited by time.monotonic
- # resolution)
- self.assertLess(granularity, 0.050)
-
class SubprocessTestsMixin:
--- /dev/null
+from asyncio import subprocess
+import asyncio
+import signal
+import sys
+import unittest
+from test import support
+if sys.platform != 'win32':
+ from asyncio import unix_events
+
+# Program exiting quickly
+PROGRAM_EXIT_FAST = [sys.executable, '-c', 'pass']
+
+# Program blocking
+PROGRAM_BLOCKED = [sys.executable, '-c', 'import time; time.sleep(3600)']
+
+# Program sleeping during 1 second
+PROGRAM_SLEEP_1SEC = [sys.executable, '-c', 'import time; time.sleep(1)']
+
+# Program copying input to output
+PROGRAM_CAT = [
+ sys.executable, '-c',
+ ';'.join(('import sys',
+ 'data = sys.stdin.buffer.read()',
+ 'sys.stdout.buffer.write(data)'))]
+
+class SubprocessMixin:
+ def test_stdin_stdout(self):
+ args = PROGRAM_CAT
+
+ @asyncio.coroutine
+ def run(data):
+ proc = yield from asyncio.create_subprocess_exec(
+ *args,
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ loop=self.loop)
+
+ # feed data
+ proc.stdin.write(data)
+ yield from proc.stdin.drain()
+ proc.stdin.close()
+
+ # get output and exitcode
+ data = yield from proc.stdout.read()
+ exitcode = yield from proc.wait()
+ return (exitcode, data)
+
+ task = run(b'some data')
+ task = asyncio.wait_for(task, 10.0, loop=self.loop)
+ exitcode, stdout = self.loop.run_until_complete(task)
+ self.assertEqual(exitcode, 0)
+ self.assertEqual(stdout, b'some data')
+
+ def test_communicate(self):
+ args = PROGRAM_CAT
+
+ @asyncio.coroutine
+ def run(data):
+ proc = yield from asyncio.create_subprocess_exec(
+ *args,
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ loop=self.loop)
+ stdout, stderr = yield from proc.communicate(data)
+ return proc.returncode, stdout
+
+ task = run(b'some data')
+ task = asyncio.wait_for(task, 10.0, loop=self.loop)
+ exitcode, stdout = self.loop.run_until_complete(task)
+ self.assertEqual(exitcode, 0)
+ self.assertEqual(stdout, b'some data')
+
+ def test_shell(self):
+ create = asyncio.create_subprocess_shell('exit 7',
+ loop=self.loop)
+ proc = self.loop.run_until_complete(create)
+ exitcode = self.loop.run_until_complete(proc.wait())
+ self.assertEqual(exitcode, 7)
+
+ def test_start_new_session(self):
+ # start the new process in a new session
+ create = asyncio.create_subprocess_shell('exit 8',
+ start_new_session=True,
+ loop=self.loop)
+ proc = self.loop.run_until_complete(create)
+ exitcode = self.loop.run_until_complete(proc.wait())
+ self.assertEqual(exitcode, 8)
+
+ def test_kill(self):
+ args = PROGRAM_BLOCKED
+ create = asyncio.create_subprocess_exec(*args, loop=self.loop)
+ proc = self.loop.run_until_complete(create)
+ proc.kill()
+ returncode = self.loop.run_until_complete(proc.wait())
+ if sys.platform == 'win32':
+ self.assertIsInstance(returncode, int)
+ # expect 1 but sometimes get 0
+ else:
+ self.assertEqual(-signal.SIGKILL, returncode)
+
+ def test_terminate(self):
+ args = PROGRAM_BLOCKED
+ create = asyncio.create_subprocess_exec(*args, loop=self.loop)
+ proc = self.loop.run_until_complete(create)
+ proc.terminate()
+ returncode = self.loop.run_until_complete(proc.wait())
+ if sys.platform == 'win32':
+ self.assertIsInstance(returncode, int)
+ # expect 1 but sometimes get 0
+ else:
+ self.assertEqual(-signal.SIGTERM, returncode)
+
+ @unittest.skipIf(sys.platform == 'win32', "Don't have SIGHUP")
+ def test_send_signal(self):
+ args = PROGRAM_BLOCKED
+ create = asyncio.create_subprocess_exec(*args, loop=self.loop)
+ proc = self.loop.run_until_complete(create)
+ proc.send_signal(signal.SIGHUP)
+ returncode = self.loop.run_until_complete(proc.wait())
+ self.assertEqual(-signal.SIGHUP, returncode)
+
+ def test_get_subprocess(self):
+ args = PROGRAM_EXIT_FAST
+
+ @asyncio.coroutine
+ def run():
+ proc = yield from asyncio.create_subprocess_exec(*args,
+ loop=self.loop)
+ yield from proc.wait()
+
+ popen = proc.get_subprocess()
+ popen.wait()
+ return (proc, popen)
+
+ proc, popen = self.loop.run_until_complete(run())
+ self.assertEqual(popen.returncode, proc.returncode)
+ self.assertEqual(popen.pid, proc.pid)
+
+ def test_broken_pipe(self):
+ large_data = b'x' * support.PIPE_MAX_SIZE
+
+ create = asyncio.create_subprocess_exec(
+ *PROGRAM_SLEEP_1SEC,
+ stdin=subprocess.PIPE,
+ loop=self.loop)
+ proc = self.loop.run_until_complete(create)
+ with self.assertRaises(BrokenPipeError):
+ self.loop.run_until_complete(proc.communicate(large_data))
+ self.loop.run_until_complete(proc.wait())
+
+
+if sys.platform != 'win32':
+ # Unix
+ class SubprocessWatcherMixin(SubprocessMixin):
+ Watcher = None
+
+ def setUp(self):
+ policy = asyncio.get_event_loop_policy()
+ self.loop = policy.new_event_loop()
+
+ # ensure that the event loop is passed explicitly in the code
+ policy.set_event_loop(None)
+
+ watcher = self.Watcher()
+ watcher.attach_loop(self.loop)
+ policy.set_child_watcher(watcher)
+
+ def tearDown(self):
+ policy = asyncio.get_event_loop_policy()
+ policy.set_child_watcher(None)
+ self.loop.close()
+ policy.set_event_loop(None)
+
+ class SubprocessSafeWatcherTests(SubprocessWatcherMixin, unittest.TestCase):
+ Watcher = unix_events.SafeChildWatcher
+
+ class SubprocessFastWatcherTests(SubprocessWatcherMixin, unittest.TestCase):
+ Watcher = unix_events.FastChildWatcher
+else:
+ # Windows
+ class SubprocessProactorTests(SubprocessMixin, unittest.TestCase):
+ def setUp(self):
+ policy = asyncio.get_event_loop_policy()
+ self.loop = asyncio.ProactorEventLoop()
+
+ # ensure that the event loop is passed explicitly in the code
+ policy.set_event_loop(None)
+
+ def tearDown(self):
+ policy = asyncio.get_event_loop_policy()
+ self.loop.close()
+ policy.set_event_loop(None)
+
+
+if __name__ == '__main__':
+ unittest.main()
self.loop.run_until_complete(f)
elapsed = self.loop.time() - start
self.assertFalse(f.result())
- self.assertTrue(0.18 < elapsed < 0.9, elapsed)
+ self.assertTrue(0.18 < elapsed < 0.5, elapsed)
_overlapped.SetEvent(event)