From: Victor Stinner Date: Tue, 25 Nov 2014 16:20:33 +0000 (+0100) Subject: Closes #22685, asyncio: Set the transport of stdout and stderr StreamReader X-Git-Tag: v3.4.3rc1~309 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=5ef586f25a6d5128a15341e849d7dca4fe882d22;p=python Closes #22685, asyncio: Set the transport of stdout and stderr StreamReader objects in the SubprocessStreamProtocol. It allows to pause the transport to not buffer too much stdout or stderr data. --- diff --git a/Lib/asyncio/subprocess.py b/Lib/asyncio/subprocess.py index e4c14995a7..f6d6a141eb 100644 --- a/Lib/asyncio/subprocess.py +++ b/Lib/asyncio/subprocess.py @@ -41,15 +41,22 @@ class SubprocessStreamProtocol(streams.FlowControlMixin, def connection_made(self, transport): self._transport = transport - if transport.get_pipe_transport(1): + + stdout_transport = transport.get_pipe_transport(1) + if stdout_transport is not None: self.stdout = streams.StreamReader(limit=self._limit, loop=self._loop) - if transport.get_pipe_transport(2): + self.stdout.set_transport(stdout_transport) + + stderr_transport = transport.get_pipe_transport(2) + if stderr_transport is not None: self.stderr = streams.StreamReader(limit=self._limit, loop=self._loop) - stdin = transport.get_pipe_transport(0) - if stdin is not None: - self.stdin = streams.StreamWriter(stdin, + self.stderr.set_transport(stderr_transport) + + stdin_transport = transport.get_pipe_transport(0) + if stdin_transport is not None: + self.stdin = streams.StreamWriter(stdin_transport, protocol=self, reader=None, loop=self._loop) diff --git a/Lib/test/test_asyncio/test_subprocess.py b/Lib/test/test_asyncio/test_subprocess.py index 0e9e1ce5f9..d0ab23080d 100644 --- a/Lib/test/test_asyncio/test_subprocess.py +++ b/Lib/test/test_asyncio/test_subprocess.py @@ -4,6 +4,7 @@ import asyncio import signal import sys import unittest +from unittest import mock from test import support if sys.platform != 'win32': from asyncio import unix_events @@ -161,6 +162,37 @@ class SubprocessMixin: self.loop.run_until_complete(proc.communicate(large_data)) self.loop.run_until_complete(proc.wait()) + def test_pause_reading(self): + @asyncio.coroutine + def test_pause_reading(): + limit = 100 + + code = '\n'.join(( + 'import sys', + 'sys.stdout.write("x" * %s)' % (limit * 2 + 1), + 'sys.stdout.flush()', + )) + proc = yield from asyncio.create_subprocess_exec( + sys.executable, '-c', code, + stdin=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE, + limit=limit, + loop=self.loop) + stdout_transport = proc._transport.get_pipe_transport(1) + stdout_transport.pause_reading = mock.Mock() + + yield from proc.wait() + + # The child process produced more than limit bytes of output, + # the stream reader transport should pause the protocol to not + # allocate too much memory. + return stdout_transport.pause_reading.called + + # Issue #22685: Ensure that the stream reader pauses the protocol + # when the child process produces too much data + called = self.loop.run_until_complete(test_pause_reading()) + self.assertTrue(called) + if sys.platform != 'win32': # Unix