]> granicus.if.org Git - python/commitdiff
Merged revisions 73825-73826 via svnmerge from
authorGregory P. Smith <greg@mad-scientist.com>
Sat, 4 Jul 2009 02:46:54 +0000 (02:46 +0000)
committerGregory P. Smith <greg@mad-scientist.com>
Sat, 4 Jul 2009 02:46:54 +0000 (02:46 +0000)
svn+ssh://pythondev@svn.python.org/python/trunk

........
  r73825 | gregory.p.smith | 2009-07-03 18:49:29 -0700 (Fri, 03 Jul 2009) | 9 lines

  Use select.poll() in subprocess, when available, rather than select() so that
  it does not fail when file descriptors are large.  Fixes issue3392.

  Patch largely contributed by Frank Chu (fpmc) with some improvements by me.
  See http://bugs.python.org/issue3392.
........
  r73826 | gregory.p.smith | 2009-07-03 18:55:11 -0700 (Fri, 03 Jul 2009) | 2 lines

  news entry for r73825
........

Candidate for backporting to release31-maint as it is a bug fix and changes no
public API.

Lib/subprocess.py
Lib/test/test_subprocess.py
Misc/NEWS

index 368ba0e06f9396f924ca4ab83d93df2060584791..f7361e1559e5529bf986bed23e79592320a9a1d1 100644 (file)
@@ -371,6 +371,7 @@ if mswindows:
             error = IOError
 else:
     import select
+    _has_poll = hasattr(select, 'poll')
     import errno
     import fcntl
     import pickle
@@ -383,6 +384,11 @@ try:
 except:
     MAXFD = 256
 
+# When select or poll has indicated that the file is writable,
+# we can write up to _PIPE_BUF bytes without risk of blocking.
+# POSIX defines PIPE_BUF as >= 512.
+_PIPE_BUF = getattr(select, 'PIPE_BUF', 512)
+
 _active = []
 
 def _cleanup():
@@ -1173,19 +1179,103 @@ class Popen(object):
 
 
         def _communicate(self, input):
-            read_set = []
-            write_set = []
-            stdout = None # Return
-            stderr = None # Return
-
             if self.stdin:
                 # Flush stdio buffer.  This might block, if the user has
                 # been writing to .stdin in an uncontrolled fashion.
                 self.stdin.flush()
-                if input:
-                    write_set.append(self.stdin)
-                else:
+                if not input:
                     self.stdin.close()
+
+            if _has_poll:
+                stdout, stderr = self._communicate_with_poll(input)
+            else:
+                stdout, stderr = self._communicate_with_select(input)
+
+            # All data exchanged.  Translate lists into strings.
+            if stdout is not None:
+                stdout = b''.join(stdout)
+            if stderr is not None:
+                stderr = b''.join(stderr)
+
+            # Translate newlines, if requested.
+            # This also turns bytes into strings.
+            if self.universal_newlines:
+                if stdout is not None:
+                    stdout = self._translate_newlines(stdout,
+                                                      self.stdout.encoding)
+                if stderr is not None:
+                    stderr = self._translate_newlines(stderr,
+                                                      self.stderr.encoding)
+
+            self.wait()
+            return (stdout, stderr)
+
+
+        def _communicate_with_poll(self, input):
+            stdout = None # Return
+            stderr = None # Return
+            fd2file = {}
+            fd2output = {}
+
+            poller = select.poll()
+            def register_and_append(file_obj, eventmask):
+                poller.register(file_obj.fileno(), eventmask)
+                fd2file[file_obj.fileno()] = file_obj
+
+            def close_unregister_and_remove(fd):
+                poller.unregister(fd)
+                fd2file[fd].close()
+                fd2file.pop(fd)
+
+            if self.stdin and input:
+                register_and_append(self.stdin, select.POLLOUT)
+
+            select_POLLIN_POLLPRI = select.POLLIN | select.POLLPRI
+            if self.stdout:
+                register_and_append(self.stdout, select_POLLIN_POLLPRI)
+                fd2output[self.stdout.fileno()] = stdout = []
+            if self.stderr:
+                register_and_append(self.stderr, select_POLLIN_POLLPRI)
+                fd2output[self.stderr.fileno()] = stderr = []
+
+            input_offset = 0
+            while fd2file:
+                try:
+                    ready = poller.poll()
+                except select.error as e:
+                    if e.args[0] == errno.EINTR:
+                        continue
+                    raise
+
+                # XXX Rewrite these to use non-blocking I/O on the
+                # file objects; they are no longer using C stdio!
+
+                for fd, mode in ready:
+                    if mode & select.POLLOUT:
+                        chunk = input[input_offset : input_offset + _PIPE_BUF]
+                        input_offset += os.write(fd, chunk)
+                        if input_offset >= len(input):
+                            close_unregister_and_remove(fd)
+                    elif mode & select_POLLIN_POLLPRI:
+                        data = os.read(fd, 4096)
+                        if not data:
+                            close_unregister_and_remove(fd)
+                        fd2output[fd].append(data)
+                    else:
+                        # Ignore hang up or errors.
+                        close_unregister_and_remove(fd)
+
+            return (stdout, stderr)
+
+
+        def _communicate_with_select(self, input):
+            read_set = []
+            write_set = []
+            stdout = None # Return
+            stderr = None # Return
+
+            if self.stdin and input:
+                write_set.append(self.stdin)
             if self.stdout:
                 read_set.append(self.stdout)
                 stdout = []
@@ -1206,10 +1296,7 @@ class Popen(object):
                 # file objects; they are no longer using C stdio!
 
                 if self.stdin in wlist:
-                    # When select has indicated that the file is writable,
-                    # we can write up to PIPE_BUF bytes without risk
-                    # blocking.  POSIX defines PIPE_BUF >= 512
-                    chunk = input[input_offset : input_offset + 512]
+                    chunk = input[input_offset : input_offset + _PIPE_BUF]
                     bytes_written = os.write(self.stdin.fileno(), chunk)
                     input_offset += bytes_written
                     if input_offset >= len(input):
@@ -1230,25 +1317,9 @@ class Popen(object):
                         read_set.remove(self.stderr)
                     stderr.append(data)
 
-            # All data exchanged.  Translate lists into strings.
-            if stdout is not None:
-                stdout = b"".join(stdout)
-            if stderr is not None:
-                stderr = b"".join(stderr)
-
-            # Translate newlines, if requested.
-            # This also turns bytes into strings.
-            if self.universal_newlines:
-                if stdout is not None:
-                    stdout = self._translate_newlines(stdout,
-                                                      self.stdout.encoding)
-                if stderr is not None:
-                    stderr = self._translate_newlines(stderr,
-                                                      self.stderr.encoding)
-
-            self.wait()
             return (stdout, stderr)
 
+
         def send_signal(self, sig):
             """Send a signal to the process
             """
index 326c996ebe49708dfc3b8ce6dc59aa78eeb3ae39..f2a396cd10fbb87c7c12e7556492880e916fc14a 100644 (file)
@@ -798,8 +798,24 @@ class CommandTests(unittest.TestCase):
                 if dir is not None:
                     os.rmdir(dir)
 
+
+unit_tests = [ProcessTestCase, CommandTests]
+
+if subprocess._has_poll:
+    class ProcessTestCaseNoPoll(ProcessTestCase):
+        def setUp(self):
+            subprocess._has_poll = False
+            ProcessTestCase.setUp(self)
+
+        def tearDown(self):
+            subprocess._has_poll = True
+            ProcessTestCase.tearDown(self)
+
+    unit_tests.append(ProcessTestCaseNoPoll)
+
+
 def test_main():
-    support.run_unittest(ProcessTestCase, CommandTests)
+    support.run_unittest(*unit_tests)
     support.reap_children()
 
 if __name__ == "__main__":
index ae39b9abc1c91c0a26e2bae704e7843821e6ad68..87949298ca7f93d5f608c2ef3d86eeb52f798a71 100644 (file)
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -34,6 +34,9 @@ C-API
 Library
 -------
 
+- Issue #3392: The subprocess communicate() method no longer fails in select()
+  when file descriptors are large; communicate() now uses poll() when possible.
+
 - Issue #6369: Fix an RLE decompression bug in the binhex module.
 
 - Issue #6344: Fixed a crash of mmap.read() when passed a negative argument.