return b''
if n < 0:
- while not self._eof:
- self._waiter = self._create_waiter('read')
- try:
- yield from self._waiter
- finally:
- self._waiter = None
+ # This used to just loop creating a new waiter hoping to
+ # collect everything in self._buffer, but that would
+ # deadlock if the subprocess sends more than self.limit
+ # bytes. So just call self.read(self._limit) until EOF.
+ blocks = []
+ while True:
+ block = yield from self.read(self._limit)
+ if not block:
+ break
+ blocks.append(block)
+ return b''.join(blocks)
else:
if not self._buffer and not self._eof:
self._waiter = self._create_waiter('read')
"""Tests for streams.py."""
import gc
+import os
import socket
+import sys
import unittest
from unittest import mock
try:
server.stop()
self.assertEqual(msg, b"hello world!\n")
+ @unittest.skipIf(sys.platform == 'win32', "Don't have pipes")
+ def test_read_all_from_pipe_reader(self):
+ # See Tulip issue 168. This test is derived from the example
+ # subprocess_attach_read_pipe.py, but we configure the
+ # StreamReader's limit so that twice it is less than the size
+ # of the data writter. Also we must explicitly attach a child
+ # watcher to the event loop.
+
+ watcher = asyncio.get_child_watcher()
+ watcher.attach_loop(self.loop)
+
+ code = """\
+import os, sys
+fd = int(sys.argv[1])
+os.write(fd, b'data')
+os.close(fd)
+"""
+ rfd, wfd = os.pipe()
+ args = [sys.executable, '-c', code, str(wfd)]
+
+ pipe = open(rfd, 'rb', 0)
+ reader = asyncio.StreamReader(loop=self.loop, limit=1)
+ protocol = asyncio.StreamReaderProtocol(reader, loop=self.loop)
+ transport, _ = self.loop.run_until_complete(
+ self.loop.connect_read_pipe(lambda: protocol, pipe))
+
+ proc = self.loop.run_until_complete(
+ asyncio.create_subprocess_exec(*args, pass_fds={wfd}, loop=self.loop))
+ self.loop.run_until_complete(proc.wait())
+
+ os.close(wfd)
+ data = self.loop.run_until_complete(reader.read(-1))
+ self.assertEqual(data, b'data')
+
if __name__ == '__main__':
unittest.main()