]> granicus.if.org Git - python/commitdiff
asyncio: Fix upstream issue 168: StreamReader.read(-1) from pipe may hang if data...
authorGuido van Rossum <guido@python.org>
Mon, 12 May 2014 17:04:37 +0000 (10:04 -0700)
committerGuido van Rossum <guido@python.org>
Mon, 12 May 2014 17:04:37 +0000 (10:04 -0700)
Lib/asyncio/streams.py
Lib/test/test_asyncio/test_streams.py

index 27d595f17410c899342566e9e85b850427e46b63..e239248d11c5560a3174ce1ab1fbcd005eda6557 100644 (file)
@@ -419,12 +419,17 @@ class StreamReader:
             return b''
 
         if n < 0:
-            while not self._eof:
-                self._waiter = self._create_waiter('read')
-                try:
-                    yield from self._waiter
-                finally:
-                    self._waiter = None
+            # This used to just loop creating a new waiter hoping to
+            # collect everything in self._buffer, but that would
+            # deadlock if the subprocess sends more than self.limit
+            # bytes.  So just call self.read(self._limit) until EOF.
+            blocks = []
+            while True:
+                block = yield from self.read(self._limit)
+                if not block:
+                    break
+                blocks.append(block)
+            return b''.join(blocks)
         else:
             if not self._buffer and not self._eof:
                 self._waiter = self._create_waiter('read')
index 031499e8143c3ed2d1b52246364a654ba492b526..23012b72e64b0470547dda69ee8971906059a8cc 100644 (file)
@@ -1,7 +1,9 @@
 """Tests for streams.py."""
 
 import gc
+import os
 import socket
+import sys
 import unittest
 from unittest import mock
 try:
@@ -583,6 +585,40 @@ class StreamReaderTests(unittest.TestCase):
             server.stop()
             self.assertEqual(msg, b"hello world!\n")
 
+    @unittest.skipIf(sys.platform == 'win32', "Don't have pipes")
+    def test_read_all_from_pipe_reader(self):
+        # See Tulip issue 168.  This test is derived from the example
+        # subprocess_attach_read_pipe.py, but we configure the
+        # StreamReader's limit so that twice it is less than the size
+        # of the data writter.  Also we must explicitly attach a child
+        # watcher to the event loop.
+
+        watcher = asyncio.get_child_watcher()
+        watcher.attach_loop(self.loop)
+
+        code = """\
+import os, sys
+fd = int(sys.argv[1])
+os.write(fd, b'data')
+os.close(fd)
+"""
+        rfd, wfd = os.pipe()
+        args = [sys.executable, '-c', code, str(wfd)]
+
+        pipe = open(rfd, 'rb', 0)
+        reader = asyncio.StreamReader(loop=self.loop, limit=1)
+        protocol = asyncio.StreamReaderProtocol(reader, loop=self.loop)
+        transport, _ = self.loop.run_until_complete(
+            self.loop.connect_read_pipe(lambda: protocol, pipe))
+
+        proc = self.loop.run_until_complete(
+            asyncio.create_subprocess_exec(*args, pass_fds={wfd}, loop=self.loop))
+        self.loop.run_until_complete(proc.wait())
+
+        os.close(wfd)
+        data = self.loop.run_until_complete(reader.read(-1))
+        self.assertEqual(data, b'data')
+
 
 if __name__ == '__main__':
     unittest.main()