error = IOError
else:
import select
+ _has_poll = hasattr(select, 'poll')
import errno
import fcntl
import pickle
except:
MAXFD = 256
+# When select or poll has indicated that the file is writable,
+# we can write up to _PIPE_BUF bytes without risk of blocking.
+# POSIX defines PIPE_BUF as >= 512.
+_PIPE_BUF = getattr(select, 'PIPE_BUF', 512)
+
_active = []
def _cleanup():
def _communicate(self, input):
- read_set = []
- write_set = []
- stdout = None # Return
- stderr = None # Return
-
if self.stdin:
# Flush stdio buffer. This might block, if the user has
# been writing to .stdin in an uncontrolled fashion.
self.stdin.flush()
- if input:
- write_set.append(self.stdin)
- else:
+ if not input:
self.stdin.close()
+
+ if _has_poll:
+ stdout, stderr = self._communicate_with_poll(input)
+ else:
+ stdout, stderr = self._communicate_with_select(input)
+
+ # All data exchanged. Translate lists into strings.
+ if stdout is not None:
+ stdout = b''.join(stdout)
+ if stderr is not None:
+ stderr = b''.join(stderr)
+
+ # Translate newlines, if requested.
+ # This also turns bytes into strings.
+ if self.universal_newlines:
+ if stdout is not None:
+ stdout = self._translate_newlines(stdout,
+ self.stdout.encoding)
+ if stderr is not None:
+ stderr = self._translate_newlines(stderr,
+ self.stderr.encoding)
+
+ self.wait()
+ return (stdout, stderr)
+
+
+ def _communicate_with_poll(self, input):
+ stdout = None # Return
+ stderr = None # Return
+ fd2file = {}
+ fd2output = {}
+
+ poller = select.poll()
+ def register_and_append(file_obj, eventmask):
+ poller.register(file_obj.fileno(), eventmask)
+ fd2file[file_obj.fileno()] = file_obj
+
+ def close_unregister_and_remove(fd):
+ poller.unregister(fd)
+ fd2file[fd].close()
+ fd2file.pop(fd)
+
+ if self.stdin and input:
+ register_and_append(self.stdin, select.POLLOUT)
+
+ select_POLLIN_POLLPRI = select.POLLIN | select.POLLPRI
+ if self.stdout:
+ register_and_append(self.stdout, select_POLLIN_POLLPRI)
+ fd2output[self.stdout.fileno()] = stdout = []
+ if self.stderr:
+ register_and_append(self.stderr, select_POLLIN_POLLPRI)
+ fd2output[self.stderr.fileno()] = stderr = []
+
+ input_offset = 0
+ while fd2file:
+ try:
+ ready = poller.poll()
+ except select.error as e:
+ if e.args[0] == errno.EINTR:
+ continue
+ raise
+
+ # XXX Rewrite these to use non-blocking I/O on the
+ # file objects; they are no longer using C stdio!
+
+ for fd, mode in ready:
+ if mode & select.POLLOUT:
+ chunk = input[input_offset : input_offset + _PIPE_BUF]
+ input_offset += os.write(fd, chunk)
+ if input_offset >= len(input):
+ close_unregister_and_remove(fd)
+ elif mode & select_POLLIN_POLLPRI:
+ data = os.read(fd, 4096)
+ if not data:
+ close_unregister_and_remove(fd)
+ fd2output[fd].append(data)
+ else:
+ # Ignore hang up or errors.
+ close_unregister_and_remove(fd)
+
+ return (stdout, stderr)
+
+
+ def _communicate_with_select(self, input):
+ read_set = []
+ write_set = []
+ stdout = None # Return
+ stderr = None # Return
+
+ if self.stdin and input:
+ write_set.append(self.stdin)
if self.stdout:
read_set.append(self.stdout)
stdout = []
# file objects; they are no longer using C stdio!
if self.stdin in wlist:
- # When select has indicated that the file is writable,
- # we can write up to PIPE_BUF bytes without risk
- # blocking. POSIX defines PIPE_BUF >= 512
- chunk = input[input_offset : input_offset + 512]
+ chunk = input[input_offset : input_offset + _PIPE_BUF]
bytes_written = os.write(self.stdin.fileno(), chunk)
input_offset += bytes_written
if input_offset >= len(input):
read_set.remove(self.stderr)
stderr.append(data)
- # All data exchanged. Translate lists into strings.
- if stdout is not None:
- stdout = b"".join(stdout)
- if stderr is not None:
- stderr = b"".join(stderr)
-
- # Translate newlines, if requested.
- # This also turns bytes into strings.
- if self.universal_newlines:
- if stdout is not None:
- stdout = self._translate_newlines(stdout,
- self.stdout.encoding)
- if stderr is not None:
- stderr = self._translate_newlines(stderr,
- self.stderr.encoding)
-
- self.wait()
return (stdout, stderr)
+
def send_signal(self, sig):
"""Send a signal to the process
"""