]> granicus.if.org Git - python/commitdiff
Use select.poll() in subprocess, when available, rather than select() so that
authorGregory P. Smith <greg@mad-scientist.com>
Sat, 4 Jul 2009 01:49:29 +0000 (01:49 +0000)
committerGregory P. Smith <greg@mad-scientist.com>
Sat, 4 Jul 2009 01:49:29 +0000 (01:49 +0000)
it does not fail when file descriptors are large.  Fixes issue3392.

Patch largely contributed by Frank Chu (fpmc) with some improvements by me.
See http://bugs.python.org/issue3392.

Candidate for backporting to release26-maint as it is a bug fix and changes no
public API.

Lib/subprocess.py
Lib/test/test_subprocess.py

index 6daeb04cd501f7a9afcaff6adbbaa17de046d0ce..c8dcb56e21e0de3faa708127d895573acd839c95 100644 (file)
@@ -413,6 +413,7 @@ if mswindows:
             error = IOError
 else:
     import select
+    _has_poll = hasattr(select, 'poll')
     import errno
     import fcntl
     import pickle
@@ -425,12 +426,10 @@ try:
 except:
     MAXFD = 256
 
-# True/False does not exist on 2.2.0
-#try:
-#    False
-#except NameError:
-#    False = 0
-#    True = 1
+# When select or poll has indicated that the file is writable,
+# we can write up to _PIPE_BUF bytes without risk of blocking.
+# POSIX defines PIPE_BUF as >= 512.
+_PIPE_BUF = getattr(select, 'PIPE_BUF', 512)
 
 _active = []
 
@@ -1191,19 +1190,100 @@ class Popen(object):
 
 
         def _communicate(self, input):
-            read_set = []
-            write_set = []
-            stdout = None # Return
-            stderr = None # Return
-
             if self.stdin:
                 # Flush stdio buffer.  This might block, if the user has
                 # been writing to .stdin in an uncontrolled fashion.
                 self.stdin.flush()
-                if input:
-                    write_set.append(self.stdin)
-                else:
+                if not input:
                     self.stdin.close()
+
+            if _has_poll:
+                stdout, stderr = self._communicate_with_poll(input)
+            else:
+                stdout, stderr = self._communicate_with_select(input)
+
+            # All data exchanged.  Translate lists into strings.
+            if stdout is not None:
+                stdout = ''.join(stdout)
+            if stderr is not None:
+                stderr = ''.join(stderr)
+
+            # Translate newlines, if requested.  We cannot let the file
+            # object do the translation: It is based on stdio, which is
+            # impossible to combine with select (unless forcing no
+            # buffering).
+            if self.universal_newlines and hasattr(file, 'newlines'):
+                if stdout:
+                    stdout = self._translate_newlines(stdout)
+                if stderr:
+                    stderr = self._translate_newlines(stderr)
+
+            self.wait()
+            return (stdout, stderr)
+
+
+        def _communicate_with_poll(self, input):
+            stdout = None # Return
+            stderr = None # Return
+            fd2file = {}
+            fd2output = {}
+
+            poller = select.poll()
+            def register_and_append(file_obj, eventmask):
+                poller.register(file_obj.fileno(), eventmask)
+                fd2file[file_obj.fileno()] = file_obj
+
+            def close_unregister_and_remove(fd):
+                poller.unregister(fd)
+                fd2file[fd].close()
+                fd2file.pop(fd)
+
+            if self.stdin and input:
+                register_and_append(self.stdin, select.POLLOUT)
+
+            select_POLLIN_POLLPRI = select.POLLIN | select.POLLPRI
+            if self.stdout:
+                register_and_append(self.stdout, select_POLLIN_POLLPRI)
+                fd2output[self.stdout.fileno()] = stdout = []
+            if self.stderr:
+                register_and_append(self.stderr, select_POLLIN_POLLPRI)
+                fd2output[self.stderr.fileno()] = stderr = []
+
+            input_offset = 0
+            while fd2file:
+                try:
+                    ready = poller.poll()
+                except select.error, e:
+                    if e.args[0] == errno.EINTR:
+                        continue
+                    raise
+
+                for fd, mode in ready:
+                    if mode & select.POLLOUT:
+                        chunk = input[input_offset : input_offset + _PIPE_BUF]
+                        input_offset += os.write(fd, chunk)
+                        if input_offset >= len(input):
+                            close_unregister_and_remove(fd)
+                    elif mode & select_POLLIN_POLLPRI:
+                        data = os.read(fd, 4096)
+                        if not data:
+                            close_unregister_and_remove(fd)
+                        fd2output[fd].append(data)
+                    else:
+                        # Ignore hang up or errors.
+                        close_unregister_and_remove(fd)
+
+            return (stdout, stderr)
+
+
+        def _communicate_with_select(self, input):
+            read_set = []
+            write_set = []
+            stdout = None # Return
+            stderr = None # Return
+
+            if self.stdin and input:
+                write_set.append(self.stdin)
             if self.stdout:
                 read_set.append(self.stdout)
                 stdout = []
@@ -1221,10 +1301,7 @@ class Popen(object):
                     raise
 
                 if self.stdin in wlist:
-                    # When select has indicated that the file is writable,
-                    # we can write up to PIPE_BUF bytes without risk
-                    # blocking.  POSIX defines PIPE_BUF >= 512
-                    chunk = input[input_offset : input_offset + 512]
+                    chunk = input[input_offset : input_offset + _PIPE_BUF]
                     bytes_written = os.write(self.stdin.fileno(), chunk)
                     input_offset += bytes_written
                     if input_offset >= len(input):
@@ -1245,25 +1322,9 @@ class Popen(object):
                         read_set.remove(self.stderr)
                     stderr.append(data)
 
-            # All data exchanged.  Translate lists into strings.
-            if stdout is not None:
-                stdout = ''.join(stdout)
-            if stderr is not None:
-                stderr = ''.join(stderr)
-
-            # Translate newlines, if requested.  We cannot let the file
-            # object do the translation: It is based on stdio, which is
-            # impossible to combine with select (unless forcing no
-            # buffering).
-            if self.universal_newlines and hasattr(file, 'newlines'):
-                if stdout:
-                    stdout = self._translate_newlines(stdout)
-                if stderr:
-                    stderr = self._translate_newlines(stderr)
-
-            self.wait()
             return (stdout, stderr)
 
+
         def send_signal(self, sig):
             """Send a signal to the process
             """
index c441cdcd669ae538bdbe87a796740f24dd4f0f6e..5add023944e136beb9dbadbbdfbb0b064302b272 100644 (file)
@@ -766,8 +766,24 @@ class ProcessTestCase(unittest.TestCase):
             p.terminate()
             self.assertNotEqual(p.wait(), 0)
 
+
+unit_tests = [ProcessTestCase]
+
+if subprocess._has_poll:
+    class ProcessTestCaseNoPoll(ProcessTestCase):
+        def setUp(self):
+            subprocess._has_poll = False
+            ProcessTestCase.setUp(self)
+
+        def tearDown(self):
+            subprocess._has_poll = True
+            ProcessTestCase.tearDown(self)
+
+    unit_tests.append(ProcessTestCaseNoPoll)
+
+
 def test_main():
-    test_support.run_unittest(ProcessTestCase)
+    test_support.run_unittest(*unit_tests)
     if hasattr(test_support, "reap_children"):
         test_support.reap_children()