]> granicus.if.org Git - python/commitdiff
Issue #11459: A `bufsize` value of 0 in subprocess.Popen() really creates
authorAntoine Pitrou <solipsis@pitrou.net>
Sat, 19 Mar 2011 16:04:13 +0000 (17:04 +0100)
committerAntoine Pitrou <solipsis@pitrou.net>
Sat, 19 Mar 2011 16:04:13 +0000 (17:04 +0100)
unbuffered pipes, such that select() works properly on them.

1  2 
Doc/library/platform.rst
Lib/os.py
Lib/platform.py
Lib/subprocess.py
Lib/test/test_subprocess.py
Misc/NEWS

Simple merge
diff --cc Lib/os.py
Simple merge
diff --cc Lib/platform.py
Simple merge
index 477f92782100a4995a1247adedbc5c9564f402c1,dc5b6088da63582e2982a677882af461f7cf8169..039b3e63021cc4c28b78a7fed92060b2eb7addf2
@@@ -699,21 -638,22 +699,19 @@@ class Popen(object)
           c2pread, c2pwrite,
           errread, errwrite) = self._get_handles(stdin, stdout, stderr)
  
 -        self._execute_child(args, executable, preexec_fn, close_fds,
 -                            cwd, env, universal_newlines,
 -                            startupinfo, creationflags, shell,
 -                            p2cread, p2cwrite,
 -                            c2pread, c2pwrite,
 -                            errread, errwrite)
 +        # We wrap OS handles *before* launching the child, otherwise a
 +        # quickly terminating child could make our fds unwrappable
 +        # (see #8458).
  
          if mswindows:
 -            if p2cwrite is not None:
 +            if p2cwrite != -1:
                  p2cwrite = msvcrt.open_osfhandle(p2cwrite.Detach(), 0)
 -            if c2pread is not None:
 +            if c2pread != -1:
                  c2pread = msvcrt.open_osfhandle(c2pread.Detach(), 0)
 -            if errread is not None:
 +            if errread != -1:
                  errread = msvcrt.open_osfhandle(errread.Detach(), 0)
  
-         if bufsize == 0:
-             bufsize = 1  # Nearly unbuffered (XXX for now)
 -        if p2cwrite is not None:
 +        if p2cwrite != -1:
              self.stdin = io.open(p2cwrite, 'wb', bufsize)
              if self.universal_newlines:
                  self.stdin = io.TextIOWrapper(self.stdin)
index 3cc387b6a3dd20e723793e5363b8468ddcc10a65,410849fc286ece42a48d28c1ab7b938b9205000b..01e670ef72e3f1173f68cf782e63b49fc3cc20b4
@@@ -626,752 -592,475 +626,768 @@@ class ProcessTestCase(BaseTestCase)
          self.assertFalse(os.path.exists(ofname))
          self.assertFalse(os.path.exists(efname))
  
 -    #
 -    # POSIX tests
 -    #
 -    if not mswindows:
 -        def test_exceptions(self):
 -            # caught & re-raised exceptions
 -            try:
 -                p = subprocess.Popen([sys.executable, "-c", ""],
 -                                     cwd="/this/path/does/not/exist")
 -            except OSError as e:
 -                # The attribute child_traceback should contain "os.chdir"
 -                # somewhere.
 -                self.assertNotEqual(e.child_traceback.find("os.chdir"), -1)
 -            else:
 -                self.fail("Expected OSError")
 -
 -        def _suppress_core_files(self):
 -            """Try to prevent core files from being created.
 -            Returns previous ulimit if successful, else None.
 -            """
 -            if sys.platform == 'darwin':
 -                # Check if the 'Crash Reporter' on OSX was configured
 -                # in 'Developer' mode and warn that it will get triggered
 -                # when it is.
 -                #
 -                # This assumes that this context manager is used in tests
 -                # that might trigger the next manager.
 -                value = subprocess.Popen(['/usr/bin/defaults', 'read',
 +
 +# context manager
 +class _SuppressCoreFiles(object):
 +    """Try to prevent core files from being created."""
 +    old_limit = None
 +
 +    def __enter__(self):
 +        """Try to save previous ulimit, then set it to (0, 0)."""
 +        try:
 +            import resource
 +            self.old_limit = resource.getrlimit(resource.RLIMIT_CORE)
 +            resource.setrlimit(resource.RLIMIT_CORE, (0, 0))
 +        except (ImportError, ValueError, resource.error):
 +            pass
 +
 +        if sys.platform == 'darwin':
 +            # Check if the 'Crash Reporter' on OSX was configured
 +            # in 'Developer' mode and warn that it will get triggered
 +            # when it is.
 +            #
 +            # This assumes that this context manager is used in tests
 +            # that might trigger the next manager.
 +            value = subprocess.Popen(['/usr/bin/defaults', 'read',
                      'com.apple.CrashReporter', 'DialogType'],
                      stdout=subprocess.PIPE).communicate()[0]
 -                if value.strip() == b'developer':
 -                    print("this tests triggers the Crash Reporter, "
 -                          "that is intentional", end='')
 -                    sys.stdout.flush()
 +            if value.strip() == b'developer':
 +                print("this tests triggers the Crash Reporter, "
 +                      "that is intentional", end='')
 +                sys.stdout.flush()
 +
 +    def __exit__(self, *args):
 +        """Return core file behavior to default."""
 +        if self.old_limit is None:
 +            return
 +        try:
 +            import resource
 +            resource.setrlimit(resource.RLIMIT_CORE, self.old_limit)
 +        except (ImportError, ValueError, resource.error):
 +            pass
  
 -            try:
 -                import resource
 -                old_limit = resource.getrlimit(resource.RLIMIT_CORE)
 -                resource.setrlimit(resource.RLIMIT_CORE, (0,0))
 -                return old_limit
 -            except (ImportError, ValueError, resource.error):
 -                return None
  
 +@unittest.skipIf(mswindows, "POSIX specific tests")
 +class POSIXProcessTestCase(BaseTestCase):
  
 +    def test_exceptions(self):
 +        nonexistent_dir = "/_this/pa.th/does/not/exist"
 +        try:
 +            os.chdir(nonexistent_dir)
 +        except OSError as e:
 +            # This avoids hard coding the errno value or the OS perror()
 +            # string and instead capture the exception that we want to see
 +            # below for comparison.
 +            desired_exception = e
 +            desired_exception.strerror += ': ' + repr(sys.executable)
 +        else:
 +            self.fail("chdir to nonexistant directory %s succeeded." %
 +                      nonexistent_dir)
  
 -        def _unsuppress_core_files(self, old_limit):
 -            """Return core file behavior to default."""
 -            if old_limit is None:
 -                return
 -            try:
 -                import resource
 -                resource.setrlimit(resource.RLIMIT_CORE, old_limit)
 -            except (ImportError, ValueError, resource.error):
 -                return
 -
 -        def test_run_abort(self):
 -            # returncode handles signal termination
 -            old_limit = self._suppress_core_files()
 -            try:
 -                p = subprocess.Popen([sys.executable,
 -                                      "-c", "import os; os.abort()"])
 -            finally:
 -                self._unsuppress_core_files(old_limit)
 -            p.wait()
 -            self.assertEqual(-p.returncode, signal.SIGABRT)
 +        # Error in the child re-raised in the parent.
 +        try:
 +            p = subprocess.Popen([sys.executable, "-c", ""],
 +                                 cwd=nonexistent_dir)
 +        except OSError as e:
 +            # Test that the child process chdir failure actually makes
 +            # it up to the parent process as the correct exception.
 +            self.assertEqual(desired_exception.errno, e.errno)
 +            self.assertEqual(desired_exception.strerror, e.strerror)
 +        else:
 +            self.fail("Expected OSError: %s" % desired_exception)
 +
 +    def test_restore_signals(self):
 +        # Code coverage for both values of restore_signals to make sure it
 +        # at least does not blow up.
 +        # A test for behavior would be complex.  Contributions welcome.
 +        subprocess.call([sys.executable, "-c", ""], restore_signals=True)
 +        subprocess.call([sys.executable, "-c", ""], restore_signals=False)
 +
 +    def test_start_new_session(self):
 +        # For code coverage of calling setsid().  We don't care if we get an
 +        # EPERM error from it depending on the test execution environment, that
 +        # still indicates that it was called.
 +        try:
 +            output = subprocess.check_output(
 +                    [sys.executable, "-c",
 +                     "import os; print(os.getpgid(os.getpid()))"],
 +                    start_new_session=True)
 +        except OSError as e:
 +            if e.errno != errno.EPERM:
 +                raise
 +        else:
 +            parent_pgid = os.getpgid(os.getpid())
 +            child_pgid = int(output)
 +            self.assertNotEqual(parent_pgid, child_pgid)
  
 -        def test_preexec(self):
 -            # preexec function
 +    def test_run_abort(self):
 +        # returncode handles signal termination
 +        with _SuppressCoreFiles():
              p = subprocess.Popen([sys.executable, "-c",
 -                                  'import sys,os;'
 -                                  'sys.stdout.write(os.getenv("FRUIT"))'],
 -                                 stdout=subprocess.PIPE,
 -                                 preexec_fn=lambda: os.putenv("FRUIT",
 -                                                              "apple"))
 -            self.assertEqual(p.stdout.read(), b"apple")
 -
 -        def test_args_string(self):
 -            # args is a string
 -            fd, fname = self.mkstemp()
 -            # reopen in text mode
 -            with open(fd, "w") as fobj:
 -                fobj.write("#!/bin/sh\n")
 -                fobj.write("exec '%s' -c 'import sys; sys.exit(47)'\n" %
 -                            sys.executable)
 -            os.chmod(fname, 0o700)
 -            p = subprocess.Popen(fname)
 +                                  'import os; os.abort()'])
              p.wait()
 -            os.remove(fname)
 -            self.assertEqual(p.returncode, 47)
 -
 -        def test_invalid_args(self):
 -            # invalid arguments should raise ValueError
 -            self.assertRaises(ValueError, subprocess.call,
 -                              [sys.executable,
 -                               "-c", "import sys; sys.exit(47)"],
 -                              startupinfo=47)
 -            self.assertRaises(ValueError, subprocess.call,
 -                              [sys.executable,
 -                               "-c", "import sys; sys.exit(47)"],
 -                              creationflags=47)
 -
 -        def test_shell_sequence(self):
 -            # Run command through the shell (sequence)
 -            newenv = os.environ.copy()
 -            newenv["FRUIT"] = "apple"
 -            p = subprocess.Popen(["echo $FRUIT"], shell=1,
 -                                 stdout=subprocess.PIPE,
 -                                 env=newenv)
 -            self.assertEqual(p.stdout.read().strip(b" \t\r\n\f"), b"apple")
 -
 -        def test_shell_string(self):
 -            # Run command through the shell (string)
 -            newenv = os.environ.copy()
 -            newenv["FRUIT"] = "apple"
 -            p = subprocess.Popen("echo $FRUIT", shell=1,
 -                                 stdout=subprocess.PIPE,
 -                                 env=newenv)
 -            self.assertEqual(p.stdout.read().strip(b" \t\r\n\f"), b"apple")
 -
 -        def test_call_string(self):
 -            # call() function with string argument on UNIX
 -            fd, fname = self.mkstemp()
 -            # reopen in text mode
 -            with open(fd, "w") as fobj:
 -                fobj.write("#!/bin/sh\n")
 -                fobj.write("exec '%s' -c 'import sys; sys.exit(47)'\n" %
 -                            sys.executable)
 -            os.chmod(fname, 0o700)
 -            rc = subprocess.call(fname)
 -            os.remove(fname)
 -            self.assertEqual(rc, 47)
 -
 -        def test_specific_shell(self):
 -            # Issue #9265: Incorrect name passed as arg[0].
 -            shells = []
 -            for prefix in ['/bin', '/usr/bin/', '/usr/local/bin']:
 -                for name in ['bash', 'ksh']:
 -                    sh = os.path.join(prefix, name)
 -                    if os.path.isfile(sh):
 -                        shells.append(sh)
 -            if not shells: # Will probably work for any shell but csh.
 -                self.skipTest("bash or ksh required for this test")
 -            sh = '/bin/sh'
 -            if os.path.isfile(sh) and not os.path.islink(sh):
 -                # Test will fail if /bin/sh is a symlink to csh.
 -                shells.append(sh)
 -            for sh in shells:
 -                p = subprocess.Popen("echo $0", executable=sh, shell=True,
 -                                     stdout=subprocess.PIPE)
 -                self.assertEqual(p.stdout.read().strip(), bytes(sh, 'ascii'))
 -
 -        def DISABLED_test_send_signal(self):
 -            p = subprocess.Popen([sys.executable,
 -                              "-c", "input()"])
 -
 -            self.assertTrue(p.poll() is None, p.poll())
 -            p.send_signal(signal.SIGINT)
 -            self.assertNotEqual(p.wait(), 0)
 -
 -        def DISABLED_test_kill(self):
 -            p = subprocess.Popen([sys.executable,
 -                            "-c", "input()"])
 -
 -            self.assertTrue(p.poll() is None, p.poll())
 -            p.kill()
 -            self.assertEqual(p.wait(), -signal.SIGKILL)
 -
 -        def DISABLED_test_terminate(self):
 -            p = subprocess.Popen([sys.executable,
 -                            "-c", "input()"])
 -
 -            self.assertTrue(p.poll() is None, p.poll())
 -            p.terminate()
 -            self.assertEqual(p.wait(), -signal.SIGTERM)
 -
 -        def test_undecodable_env(self):
 -            for key, value in (('test', 'abc\uDCFF'), ('test\uDCFF', '42')):
 -                value_repr = ascii(value).encode("ascii")
 -
 -                # test str with surrogates
 -                script = "import os; print(ascii(os.getenv(%s)))" % repr(key)
 -                env = os.environ.copy()
 -                env[key] = value
 -                # Force surrogate-escaping of \xFF in the child process;
 -                # otherwise it can be decoded as-is if the default locale
 -                # is latin-1.
 -                env['PYTHONFSENCODING'] = 'ascii'
 -                stdout = subprocess.check_output(
 -                    [sys.executable, "-c", script],
 -                    env=env)
 -                stdout = stdout.rstrip(b'\n\r')
 -                self.assertEqual(stdout, value_repr)
 -
 -                # test bytes
 -                key = key.encode("ascii", "surrogateescape")
 -                value = value.encode("ascii", "surrogateescape")
 -                script = "import os; print(ascii(os.getenv(%s)))" % repr(key)
 -                env = os.environ.copy()
 -                env[key] = value
 -                stdout = subprocess.check_output(
 -                    [sys.executable, "-c", script],
 -                    env=env)
 -                stdout = stdout.rstrip(b'\n\r')
 -                self.assertEqual(stdout, value_repr)
 -
 -        def test_wait_when_sigchild_ignored(self):
 -            # NOTE: sigchild_ignore.py may not be an effective test on all OSes.
 -            sigchild_ignore = support.findfile("sigchild_ignore.py",
 -                                               subdir="subprocessdata")
 -            p = subprocess.Popen([sys.executable, sigchild_ignore],
 -                                 stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 -            stdout, stderr = p.communicate()
 -            self.assertEqual(0, p.returncode, "sigchild_ignore.py exited"
 -                             " non-zero with this error:\n%s" %
 -                             stderr.decode('utf8'))
 -
 -        def check_close_std_fds(self, fds):
 -            # Issue #9905: test that subprocess pipes still work properly with
 -            # some standard fds closed
 -            stdin = 0
 -            newfds = []
 -            for a in fds:
 -                b = os.dup(a)
 -                newfds.append(b)
 -                if a == 0:
 -                    stdin = b
 -            try:
 -                for fd in fds:
 -                    os.close(fd)
 -                out, err = subprocess.Popen([sys.executable, "-c",
 -                                  'import sys;'
 -                                  'sys.stdout.write("apple");'
 -                                  'sys.stdout.flush();'
 -                                  'sys.stderr.write("orange")'],
 -                           stdin=stdin,
 -                           stdout=subprocess.PIPE,
 -                           stderr=subprocess.PIPE).communicate()
 -                err = support.strip_python_stderr(err)
 -                self.assertEqual((out, err), (b'apple', b'orange'))
 -            finally:
 -                for b, a in zip(newfds, fds):
 -                    os.dup2(b, a)
 -                for b in newfds:
 -                    os.close(b)
 +        self.assertEqual(-p.returncode, signal.SIGABRT)
  
 -        def test_close_fd_0(self):
 -            self.check_close_std_fds([0])
 +    def test_preexec(self):
 +        # DISCLAIMER: Setting environment variables is *not* a good use
 +        # of a preexec_fn.  This is merely a test.
 +        p = subprocess.Popen([sys.executable, "-c",
 +                              'import sys,os;'
 +                              'sys.stdout.write(os.getenv("FRUIT"))'],
 +                             stdout=subprocess.PIPE,
 +                             preexec_fn=lambda: os.putenv("FRUIT", "apple"))
 +        self.addCleanup(p.stdout.close)
 +        self.assertEqual(p.stdout.read(), b"apple")
  
 -        def test_close_fd_1(self):
 -            self.check_close_std_fds([1])
 +    def test_preexec_exception(self):
 +        def raise_it():
 +            raise ValueError("What if two swallows carried a coconut?")
 +        try:
 +            p = subprocess.Popen([sys.executable, "-c", ""],
 +                                 preexec_fn=raise_it)
 +        except RuntimeError as e:
 +            self.assertTrue(
 +                    subprocess._posixsubprocess,
 +                    "Expected a ValueError from the preexec_fn")
 +        except ValueError as e:
 +            self.assertIn("coconut", e.args[0])
 +        else:
 +            self.fail("Exception raised by preexec_fn did not make it "
 +                      "to the parent process.")
 +
 +    @unittest.skipUnless(gc, "Requires a gc module.")
 +    def test_preexec_gc_module_failure(self):
 +        # This tests the code that disables garbage collection if the child
 +        # process will execute any Python.
 +        def raise_runtime_error():
 +            raise RuntimeError("this shouldn't escape")
 +        enabled = gc.isenabled()
 +        orig_gc_disable = gc.disable
 +        orig_gc_isenabled = gc.isenabled
 +        try:
 +            gc.disable()
 +            self.assertFalse(gc.isenabled())
 +            subprocess.call([sys.executable, '-c', ''],
 +                            preexec_fn=lambda: None)
 +            self.assertFalse(gc.isenabled(),
 +                             "Popen enabled gc when it shouldn't.")
 +
 +            gc.enable()
 +            self.assertTrue(gc.isenabled())
 +            subprocess.call([sys.executable, '-c', ''],
 +                            preexec_fn=lambda: None)
 +            self.assertTrue(gc.isenabled(), "Popen left gc disabled.")
 +
 +            gc.disable = raise_runtime_error
 +            self.assertRaises(RuntimeError, subprocess.Popen,
 +                              [sys.executable, '-c', ''],
 +                              preexec_fn=lambda: None)
 +
 +            del gc.isenabled  # force an AttributeError
 +            self.assertRaises(AttributeError, subprocess.Popen,
 +                              [sys.executable, '-c', ''],
 +                              preexec_fn=lambda: None)
 +        finally:
 +            gc.disable = orig_gc_disable
 +            gc.isenabled = orig_gc_isenabled
 +            if not enabled:
 +                gc.disable()
 +
 +    def test_args_string(self):
 +        # args is a string
 +        fd, fname = mkstemp()
 +        # reopen in text mode
 +        with open(fd, "w", errors="surrogateescape") as fobj:
 +            fobj.write("#!/bin/sh\n")
 +            fobj.write("exec '%s' -c 'import sys; sys.exit(47)'\n" %
 +                       sys.executable)
 +        os.chmod(fname, 0o700)
 +        p = subprocess.Popen(fname)
 +        p.wait()
 +        os.remove(fname)
 +        self.assertEqual(p.returncode, 47)
  
 -        def test_close_fd_2(self):
 -            self.check_close_std_fds([2])
 +    def test_invalid_args(self):
 +        # invalid arguments should raise ValueError
 +        self.assertRaises(ValueError, subprocess.call,
 +                          [sys.executable, "-c",
 +                           "import sys; sys.exit(47)"],
 +                          startupinfo=47)
 +        self.assertRaises(ValueError, subprocess.call,
 +                          [sys.executable, "-c",
 +                           "import sys; sys.exit(47)"],
 +                          creationflags=47)
 +
 +    def test_shell_sequence(self):
 +        # Run command through the shell (sequence)
 +        newenv = os.environ.copy()
 +        newenv["FRUIT"] = "apple"
 +        p = subprocess.Popen(["echo $FRUIT"], shell=1,
 +                             stdout=subprocess.PIPE,
 +                             env=newenv)
 +        self.addCleanup(p.stdout.close)
 +        self.assertEqual(p.stdout.read().strip(b" \t\r\n\f"), b"apple")
  
 -        def test_close_fds_0_1(self):
 -            self.check_close_std_fds([0, 1])
 +    def test_shell_string(self):
 +        # Run command through the shell (string)
 +        newenv = os.environ.copy()
 +        newenv["FRUIT"] = "apple"
 +        p = subprocess.Popen("echo $FRUIT", shell=1,
 +                             stdout=subprocess.PIPE,
 +                             env=newenv)
 +        self.addCleanup(p.stdout.close)
 +        self.assertEqual(p.stdout.read().strip(b" \t\r\n\f"), b"apple")
 +
 +    def test_call_string(self):
 +        # call() function with string argument on UNIX
 +        fd, fname = mkstemp()
 +        # reopen in text mode
 +        with open(fd, "w", errors="surrogateescape") as fobj:
 +            fobj.write("#!/bin/sh\n")
 +            fobj.write("exec '%s' -c 'import sys; sys.exit(47)'\n" %
 +                       sys.executable)
 +        os.chmod(fname, 0o700)
 +        rc = subprocess.call(fname)
 +        os.remove(fname)
 +        self.assertEqual(rc, 47)
  
 -        def test_close_fds_0_2(self):
 -            self.check_close_std_fds([0, 2])
 +    def test_specific_shell(self):
 +        # Issue #9265: Incorrect name passed as arg[0].
 +        shells = []
 +        for prefix in ['/bin', '/usr/bin/', '/usr/local/bin']:
 +            for name in ['bash', 'ksh']:
 +                sh = os.path.join(prefix, name)
 +                if os.path.isfile(sh):
 +                    shells.append(sh)
 +        if not shells: # Will probably work for any shell but csh.
 +            self.skipTest("bash or ksh required for this test")
 +        sh = '/bin/sh'
 +        if os.path.isfile(sh) and not os.path.islink(sh):
 +            # Test will fail if /bin/sh is a symlink to csh.
 +            shells.append(sh)
 +        for sh in shells:
 +            p = subprocess.Popen("echo $0", executable=sh, shell=True,
 +                                 stdout=subprocess.PIPE)
 +            self.addCleanup(p.stdout.close)
 +            self.assertEqual(p.stdout.read().strip(), bytes(sh, 'ascii'))
 +
 +    def _kill_process(self, method, *args):
 +        # Do not inherit file handles from the parent.
 +        # It should fix failures on some platforms.
 +        p = subprocess.Popen([sys.executable, "-c", """if 1:
 +                             import sys, time
 +                             sys.stdout.write('x\\n')
 +                             sys.stdout.flush()
 +                             time.sleep(30)
 +                             """],
 +                             close_fds=True,
 +                             stdin=subprocess.PIPE,
 +                             stdout=subprocess.PIPE,
 +                             stderr=subprocess.PIPE)
 +        # Wait for the interpreter to be completely initialized before
 +        # sending any signal.
 +        p.stdout.read(1)
 +        getattr(p, method)(*args)
 +        return p
 +
 +    def test_send_signal(self):
 +        p = self._kill_process('send_signal', signal.SIGINT)
 +        _, stderr = p.communicate()
 +        self.assertIn(b'KeyboardInterrupt', stderr)
 +        self.assertNotEqual(p.wait(), 0)
 +
 +    def test_kill(self):
 +        p = self._kill_process('kill')
 +        _, stderr = p.communicate()
 +        self.assertStderrEqual(stderr, b'')
 +        self.assertEqual(p.wait(), -signal.SIGKILL)
 +
 +    def test_terminate(self):
 +        p = self._kill_process('terminate')
 +        _, stderr = p.communicate()
 +        self.assertStderrEqual(stderr, b'')
 +        self.assertEqual(p.wait(), -signal.SIGTERM)
 +
 +    def check_close_std_fds(self, fds):
 +        # Issue #9905: test that subprocess pipes still work properly with
 +        # some standard fds closed
 +        stdin = 0
 +        newfds = []
 +        for a in fds:
 +            b = os.dup(a)
 +            newfds.append(b)
 +            if a == 0:
 +                stdin = b
 +        try:
 +            for fd in fds:
 +                os.close(fd)
 +            out, err = subprocess.Popen([sys.executable, "-c",
 +                              'import sys;'
 +                              'sys.stdout.write("apple");'
 +                              'sys.stdout.flush();'
 +                              'sys.stderr.write("orange")'],
 +                       stdin=stdin,
 +                       stdout=subprocess.PIPE,
 +                       stderr=subprocess.PIPE).communicate()
 +            err = support.strip_python_stderr(err)
 +            self.assertEqual((out, err), (b'apple', b'orange'))
 +        finally:
 +            for b, a in zip(newfds, fds):
 +                os.dup2(b, a)
 +            for b in newfds:
 +                os.close(b)
  
 -        def test_close_fds_1_2(self):
 -            self.check_close_std_fds([1, 2])
 +    def test_close_fd_0(self):
 +        self.check_close_std_fds([0])
  
 -        def test_close_fds_0_1_2(self):
 -            # Issue #10806: test that subprocess pipes still work properly with
 -            # all standard fds closed.
 -            self.check_close_std_fds([0, 1, 2])
 +    def test_close_fd_1(self):
 +        self.check_close_std_fds([1])
  
 -        def test_surrogates_error_message(self):
 -            def prepare():
 -                raise ValueError("surrogate:\uDCff")
 +    def test_close_fd_2(self):
 +        self.check_close_std_fds([2])
  
 -        def test_select_unbuffered(self):
 -            # Issue #11459: bufsize=0 should really set the pipes as
 -            # unbuffered (and therefore let select() work properly).
 -            select = support.import_module("select")
 -            p = subprocess.Popen([sys.executable, "-c",
 -                                  'import sys;'
 -                                  'sys.stdout.write("apple")'],
 -                                 stdout=subprocess.PIPE,
 -                                 bufsize=0)
 -            f = p.stdout
 +    def test_close_fds_0_1(self):
 +        self.check_close_std_fds([0, 1])
 +
 +    def test_close_fds_0_2(self):
 +        self.check_close_std_fds([0, 2])
 +
 +    def test_close_fds_1_2(self):
 +        self.check_close_std_fds([1, 2])
 +
 +    def test_close_fds_0_1_2(self):
 +        # Issue #10806: test that subprocess pipes still work properly with
 +        # all standard fds closed.
 +        self.check_close_std_fds([0, 1, 2])
 +
 +    def test_remapping_std_fds(self):
 +        # open up some temporary files
 +        temps = [mkstemp() for i in range(3)]
 +        try:
 +            temp_fds = [fd for fd, fname in temps]
 +
 +            # unlink the files -- we won't need to reopen them
 +            for fd, fname in temps:
 +                os.unlink(fname)
 +
 +            # write some data to what will become stdin, and rewind
 +            os.write(temp_fds[1], b"STDIN")
 +            os.lseek(temp_fds[1], 0, 0)
 +
 +            # move the standard file descriptors out of the way
 +            saved_fds = [os.dup(fd) for fd in range(3)]
              try:
 -                self.assertEqual(f.read(4), b"appl")
 -                self.assertIn(f, select.select([f], [], [], 0.0)[0])
 -            finally:
 +                # duplicate the file objects over the standard fd's
 +                for fd, temp_fd in enumerate(temp_fds):
 +                    os.dup2(temp_fd, fd)
 +
 +                # now use those files in the "wrong" order, so that subprocess
 +                # has to rearrange them in the child
 +                p = subprocess.Popen([sys.executable, "-c",
 +                    'import sys; got = sys.stdin.read();'
 +                    'sys.stdout.write("got %s"%got); sys.stderr.write("err")'],
 +                    stdin=temp_fds[1],
 +                    stdout=temp_fds[2],
 +                    stderr=temp_fds[0])
                  p.wait()
 +            finally:
 +                # restore the original fd's underneath sys.stdin, etc.
 +                for std, saved in enumerate(saved_fds):
 +                    os.dup2(saved, std)
 +                    os.close(saved)
  
 -    #
 -    # Windows tests
 -    #
 -    if mswindows:
 -        def test_startupinfo(self):
 -            # startupinfo argument
 -            # We uses hardcoded constants, because we do not want to
 -            # depend on win32all.
 -            STARTF_USESHOWWINDOW = 1
 -            SW_MAXIMIZE = 3
 -            startupinfo = subprocess.STARTUPINFO()
 -            startupinfo.dwFlags = STARTF_USESHOWWINDOW
 -            startupinfo.wShowWindow = SW_MAXIMIZE
 -            # Since Python is a console process, it won't be affected
 -            # by wShowWindow, but the argument should be silently
 -            # ignored
 -            subprocess.call([sys.executable, "-c", "import sys; sys.exit(0)"],
 -                        startupinfo=startupinfo)
 +            for fd in temp_fds:
 +                os.lseek(fd, 0, 0)
  
 -        def test_creationflags(self):
 -            # creationflags argument
 -            CREATE_NEW_CONSOLE = 16
 -            sys.stderr.write("    a DOS box should flash briefly ...\n")
 -            subprocess.call(sys.executable +
 -                                ' -c "import time; time.sleep(0.25)"',
 -                            creationflags=CREATE_NEW_CONSOLE)
 -
 -        def test_invalid_args(self):
 -            # invalid arguments should raise ValueError
 -            self.assertRaises(ValueError, subprocess.call,
 -                              [sys.executable,
 -                               "-c", "import sys; sys.exit(47)"],
 -                              preexec_fn=lambda: 1)
 -            self.assertRaises(ValueError, subprocess.call,
 -                              [sys.executable,
 -                               "-c", "import sys; sys.exit(47)"],
 -                              stdout=subprocess.PIPE,
 -                              close_fds=True)
 +            out = os.read(temp_fds[2], 1024)
 +            err = support.strip_python_stderr(os.read(temp_fds[0], 1024))
 +            self.assertEqual(out, b"got STDIN")
 +            self.assertEqual(err, b"err")
  
 -        def test_close_fds(self):
 -            # close file descriptors
 -            rc = subprocess.call([sys.executable, "-c",
 -                                  "import sys; sys.exit(47)"],
 -                                  close_fds=True)
 -            self.assertEqual(rc, 47)
 -
 -        def test_shell_sequence(self):
 -            # Run command through the shell (sequence)
 -            newenv = os.environ.copy()
 -            newenv["FRUIT"] = "physalis"
 -            p = subprocess.Popen(["set"], shell=1,
 -                                 stdout=subprocess.PIPE,
 -                                 env=newenv)
 -            self.assertNotEqual(p.stdout.read().find(b"physalis"), -1)
 -
 -        def test_shell_string(self):
 -            # Run command through the shell (string)
 -            newenv = os.environ.copy()
 -            newenv["FRUIT"] = "physalis"
 -            p = subprocess.Popen("set", shell=1,
 -                                 stdout=subprocess.PIPE,
 -                                 env=newenv)
 -            self.assertNotEqual(p.stdout.read().find(b"physalis"), -1)
 +        finally:
 +            for fd in temp_fds:
 +                os.close(fd)
  
 -        def test_call_string(self):
 -            # call() function with string argument on Windows
 -            rc = subprocess.call(sys.executable +
 -                                 ' -c "import sys; sys.exit(47)"')
 -            self.assertEqual(rc, 47)
 +    def test_surrogates_error_message(self):
 +        def prepare():
 +            raise ValueError("surrogate:\uDCff")
  
 -        def DISABLED_test_send_signal(self):
 -            p = subprocess.Popen([sys.executable,
 -                              "-c", "input()"])
 +        try:
 +            subprocess.call(
 +                [sys.executable, "-c", "pass"],
 +                preexec_fn=prepare)
 +        except ValueError as err:
 +            # Pure Python implementations keeps the message
 +            self.assertIsNone(subprocess._posixsubprocess)
 +            self.assertEqual(str(err), "surrogate:\uDCff")
 +        except RuntimeError as err:
 +            # _posixsubprocess uses a default message
 +            self.assertIsNotNone(subprocess._posixsubprocess)
 +            self.assertEqual(str(err), "Exception occurred in preexec_fn.")
 +        else:
 +            self.fail("Expected ValueError or RuntimeError")
 +
 +    def test_undecodable_env(self):
 +        for key, value in (('test', 'abc\uDCFF'), ('test\uDCFF', '42')):
 +            # test str with surrogates
 +            script = "import os; print(ascii(os.getenv(%s)))" % repr(key)
 +            env = os.environ.copy()
 +            env[key] = value
 +            # Use C locale to get ascii for the locale encoding to force
 +            # surrogate-escaping of \xFF in the child process; otherwise it can
 +            # be decoded as-is if the default locale is latin-1.
 +            env['LC_ALL'] = 'C'
 +            stdout = subprocess.check_output(
 +                [sys.executable, "-c", script],
 +                env=env)
 +            stdout = stdout.rstrip(b'\n\r')
 +            self.assertEqual(stdout.decode('ascii'), ascii(value))
 +
 +            # test bytes
 +            key = key.encode("ascii", "surrogateescape")
 +            value = value.encode("ascii", "surrogateescape")
 +            script = "import os; print(ascii(os.getenvb(%s)))" % repr(key)
 +            env = os.environ.copy()
 +            env[key] = value
 +            stdout = subprocess.check_output(
 +                [sys.executable, "-c", script],
 +                env=env)
 +            stdout = stdout.rstrip(b'\n\r')
 +            self.assertEqual(stdout.decode('ascii'), ascii(value))
 +
 +    def test_bytes_program(self):
 +        abs_program = os.fsencode(sys.executable)
 +        path, program = os.path.split(sys.executable)
 +        program = os.fsencode(program)
 +
 +        # absolute bytes path
 +        exitcode = subprocess.call([abs_program, "-c", "pass"])
 +        self.assertEqual(exitcode, 0)
 +
 +        # bytes program, unicode PATH
 +        env = os.environ.copy()
 +        env["PATH"] = path
 +        exitcode = subprocess.call([program, "-c", "pass"], env=env)
 +        self.assertEqual(exitcode, 0)
 +
 +        # bytes program, bytes PATH
 +        envb = os.environb.copy()
 +        envb[b"PATH"] = os.fsencode(path)
 +        exitcode = subprocess.call([program, "-c", "pass"], env=envb)
 +        self.assertEqual(exitcode, 0)
 +
 +    def test_pipe_cloexec(self):
 +        sleeper = support.findfile("input_reader.py", subdir="subprocessdata")
 +        fd_status = support.findfile("fd_status.py", subdir="subprocessdata")
 +
 +        p1 = subprocess.Popen([sys.executable, sleeper],
 +                              stdin=subprocess.PIPE, stdout=subprocess.PIPE,
 +                              stderr=subprocess.PIPE, close_fds=False)
 +
 +        self.addCleanup(p1.communicate, b'')
 +
 +        p2 = subprocess.Popen([sys.executable, fd_status],
 +                              stdout=subprocess.PIPE, close_fds=False)
 +
 +        output, error = p2.communicate()
 +        result_fds = set(map(int, output.split(b',')))
 +        unwanted_fds = set([p1.stdin.fileno(), p1.stdout.fileno(),
 +                            p1.stderr.fileno()])
 +
 +        self.assertFalse(result_fds & unwanted_fds,
 +                         "Expected no fds from %r to be open in child, "
 +                         "found %r" %
 +                              (unwanted_fds, result_fds & unwanted_fds))
 +
 +    def test_pipe_cloexec_real_tools(self):
 +        qcat = support.findfile("qcat.py", subdir="subprocessdata")
 +        qgrep = support.findfile("qgrep.py", subdir="subprocessdata")
 +
 +        subdata = b'zxcvbn'
 +        data = subdata * 4 + b'\n'
 +
 +        p1 = subprocess.Popen([sys.executable, qcat],
 +                              stdin=subprocess.PIPE, stdout=subprocess.PIPE,
 +                              close_fds=False)
 +
 +        p2 = subprocess.Popen([sys.executable, qgrep, subdata],
 +                              stdin=p1.stdout, stdout=subprocess.PIPE,
 +                              close_fds=False)
 +
 +        self.addCleanup(p1.wait)
 +        self.addCleanup(p2.wait)
 +        self.addCleanup(p1.terminate)
 +        self.addCleanup(p2.terminate)
 +
 +        p1.stdin.write(data)
 +        p1.stdin.close()
 +
 +        readfiles, ignored1, ignored2 = select.select([p2.stdout], [], [], 10)
 +
 +        self.assertTrue(readfiles, "The child hung")
 +        self.assertEqual(p2.stdout.read(), data)
 +
 +        p1.stdout.close()
 +        p2.stdout.close()
 +
 +    def test_close_fds(self):
 +        fd_status = support.findfile("fd_status.py", subdir="subprocessdata")
 +
 +        fds = os.pipe()
 +        self.addCleanup(os.close, fds[0])
 +        self.addCleanup(os.close, fds[1])
 +
 +        open_fds = set(fds)
 +
 +        p = subprocess.Popen([sys.executable, fd_status],
 +                             stdout=subprocess.PIPE, close_fds=False)
 +        output, ignored = p.communicate()
 +        remaining_fds = set(map(int, output.split(b',')))
 +
 +        self.assertEqual(remaining_fds & open_fds, open_fds,
 +                         "Some fds were closed")
 +
 +        p = subprocess.Popen([sys.executable, fd_status],
 +                             stdout=subprocess.PIPE, close_fds=True)
 +        output, ignored = p.communicate()
 +        remaining_fds = set(map(int, output.split(b',')))
 +
 +        self.assertFalse(remaining_fds & open_fds,
 +                         "Some fds were left open")
 +        self.assertIn(1, remaining_fds, "Subprocess failed")
 +
 +    def test_pass_fds(self):
 +        fd_status = support.findfile("fd_status.py", subdir="subprocessdata")
  
 -            self.assertTrue(p.poll() is None, p.poll())
 -            p.send_signal(signal.SIGTERM)
 -            self.assertNotEqual(p.wait(), 0)
 +        open_fds = set()
  
 -        def DISABLED_test_kill(self):
 -            p = subprocess.Popen([sys.executable,
 -                            "-c", "input()"])
 +        for x in range(5):
 +            fds = os.pipe()
 +            self.addCleanup(os.close, fds[0])
 +            self.addCleanup(os.close, fds[1])
 +            open_fds.update(fds)
  
 -            self.assertTrue(p.poll() is None, p.poll())
 -            p.kill()
 -            self.assertNotEqual(p.wait(), 0)
 +        for fd in open_fds:
 +            p = subprocess.Popen([sys.executable, fd_status],
 +                                 stdout=subprocess.PIPE, close_fds=True,
 +                                 pass_fds=(fd, ))
 +            output, ignored = p.communicate()
  
 -        def DISABLED_test_terminate(self):
 -            p = subprocess.Popen([sys.executable,
 -                            "-c", "input()"])
 +            remaining_fds = set(map(int, output.split(b',')))
 +            to_be_closed = open_fds - {fd}
  
 -            self.assertTrue(p.poll() is None, p.poll())
 -            p.terminate()
 -            self.assertNotEqual(p.wait(), 0)
 +            self.assertIn(fd, remaining_fds, "fd to be passed not passed")
 +            self.assertFalse(remaining_fds & to_be_closed,
 +                             "fd to be closed passed")
  
 +            # pass_fds overrides close_fds with a warning.
 +            with self.assertWarns(RuntimeWarning) as context:
 +                self.assertFalse(subprocess.call(
 +                        [sys.executable, "-c", "import sys; sys.exit(0)"],
 +                        close_fds=False, pass_fds=(fd, )))
 +            self.assertIn('overriding close_fds', str(context.warning))
  
 -class CommandTests(unittest.TestCase):
 -# The module says:
 -#   "NB This only works (and is only relevant) for UNIX."
 -#
 -# Actually, getoutput should work on any platform with an os.popen, but
 -# I'll take the comment as given, and skip this suite.
 -    if os.name == 'posix':
 +    def test_stdout_stdin_are_single_inout_fd(self):
 +        with io.open(os.devnull, "r+") as inout:
 +            p = subprocess.Popen([sys.executable, "-c", "import sys; sys.exit(0)"],
 +                                 stdout=inout, stdin=inout)
 +            p.wait()
  
 -        def test_getoutput(self):
 -            self.assertEqual(subprocess.getoutput('echo xyzzy'), 'xyzzy')
 -            self.assertEqual(subprocess.getstatusoutput('echo xyzzy'),
 -                             (0, 'xyzzy'))
 +    def test_stdout_stderr_are_single_inout_fd(self):
 +        with io.open(os.devnull, "r+") as inout:
 +            p = subprocess.Popen([sys.executable, "-c", "import sys; sys.exit(0)"],
 +                                 stdout=inout, stderr=inout)
 +            p.wait()
  
 -            # we use mkdtemp in the next line to create an empty directory
 -            # under our exclusive control; from that, we can invent a pathname
 -            # that we _know_ won't exist.  This is guaranteed to fail.
 -            dir = None
 -            try:
 -                dir = tempfile.mkdtemp()
 -                name = os.path.join(dir, "foo")
 +    def test_stderr_stdin_are_single_inout_fd(self):
 +        with io.open(os.devnull, "r+") as inout:
 +            p = subprocess.Popen([sys.executable, "-c", "import sys; sys.exit(0)"],
 +                                 stderr=inout, stdin=inout)
 +            p.wait()
  
 -                status, output = subprocess.getstatusoutput('cat ' + name)
 -                self.assertNotEqual(status, 0)
 -            finally:
 -                if dir is not None:
 -                    os.rmdir(dir)
 +    def test_wait_when_sigchild_ignored(self):
 +        # NOTE: sigchild_ignore.py may not be an effective test on all OSes.
 +        sigchild_ignore = support.findfile("sigchild_ignore.py",
 +                                           subdir="subprocessdata")
 +        p = subprocess.Popen([sys.executable, sigchild_ignore],
 +                             stdout=subprocess.PIPE, stderr=subprocess.PIPE)
 +        stdout, stderr = p.communicate()
 +        self.assertEqual(0, p.returncode, "sigchild_ignore.py exited"
 +                         " non-zero with this error:\n%s" %
 +                         stderr.decode('utf8'))
 +
++    def test_select_unbuffered(self):
++        # Issue #11459: bufsize=0 should really set the pipes as
++        # unbuffered (and therefore let select() work properly).
++        select = support.import_module("select")
++        p = subprocess.Popen([sys.executable, "-c",
++                              'import sys;'
++                              'sys.stdout.write("apple")'],
++                             stdout=subprocess.PIPE,
++                             bufsize=0)
++        f = p.stdout
++        try:
++            self.assertEqual(f.read(4), b"appl")
++            self.assertIn(f, select.select([f], [], [], 0.0)[0])
++        finally:
++            p.wait()
  
 -unit_tests = [ProcessTestCase, CommandTests]
 +@unittest.skipUnless(mswindows, "Windows specific tests")
 +class Win32ProcessTestCase(BaseTestCase):
 +
 +    def test_startupinfo(self):
 +        # startupinfo argument
 +        # We uses hardcoded constants, because we do not want to
 +        # depend on win32all.
 +        STARTF_USESHOWWINDOW = 1
 +        SW_MAXIMIZE = 3
 +        startupinfo = subprocess.STARTUPINFO()
 +        startupinfo.dwFlags = STARTF_USESHOWWINDOW
 +        startupinfo.wShowWindow = SW_MAXIMIZE
 +        # Since Python is a console process, it won't be affected
 +        # by wShowWindow, but the argument should be silently
 +        # ignored
 +        subprocess.call([sys.executable, "-c", "import sys; sys.exit(0)"],
 +                        startupinfo=startupinfo)
  
 -if mswindows:
 -    class CommandsWithSpaces (BaseTestCase):
 -
 -        def setUp(self):
 -            super().setUp()
 -            f, fname = self.mkstemp(".py", "te st")
 -            self.fname = fname.lower ()
 -            os.write(f, b"import sys;"
 -                        b"sys.stdout.write('%d %s' % (len(sys.argv), [a.lower () for a in sys.argv]))"
 -            )
 -            os.close(f)
 -
 -        def tearDown(self):
 -            os.remove(self.fname)
 -            super().tearDown()
 -
 -        def with_spaces(self, *args, **kwargs):
 -            kwargs['stdout'] = subprocess.PIPE
 -            p = subprocess.Popen(*args, **kwargs)
 -            self.assertEqual(
 -              p.stdout.read ().decode("mbcs"),
 -              "2 [%r, 'ab cd']" % self.fname
 -            )
 -
 -        def test_shell_string_with_spaces(self):
 -            # call() function with string argument with spaces on Windows
 -            self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname,
 -                                             "ab cd"), shell=1)
 +    def test_creationflags(self):
 +        # creationflags argument
 +        CREATE_NEW_CONSOLE = 16
 +        sys.stderr.write("    a DOS box should flash briefly ...\n")
 +        subprocess.call(sys.executable +
 +                        ' -c "import time; time.sleep(0.25)"',
 +                        creationflags=CREATE_NEW_CONSOLE)
 +
 +    def test_invalid_args(self):
 +        # invalid arguments should raise ValueError
 +        self.assertRaises(ValueError, subprocess.call,
 +                          [sys.executable, "-c",
 +                           "import sys; sys.exit(47)"],
 +                          preexec_fn=lambda: 1)
 +        self.assertRaises(ValueError, subprocess.call,
 +                          [sys.executable, "-c",
 +                           "import sys; sys.exit(47)"],
 +                          stdout=subprocess.PIPE,
 +                          close_fds=True)
 +
 +    def test_close_fds(self):
 +        # close file descriptors
 +        rc = subprocess.call([sys.executable, "-c",
 +                              "import sys; sys.exit(47)"],
 +                              close_fds=True)
 +        self.assertEqual(rc, 47)
 +
 +    def test_shell_sequence(self):
 +        # Run command through the shell (sequence)
 +        newenv = os.environ.copy()
 +        newenv["FRUIT"] = "physalis"
 +        p = subprocess.Popen(["set"], shell=1,
 +                             stdout=subprocess.PIPE,
 +                             env=newenv)
 +        self.addCleanup(p.stdout.close)
 +        self.assertIn(b"physalis", p.stdout.read())
 +
 +    def test_shell_string(self):
 +        # Run command through the shell (string)
 +        newenv = os.environ.copy()
 +        newenv["FRUIT"] = "physalis"
 +        p = subprocess.Popen("set", shell=1,
 +                             stdout=subprocess.PIPE,
 +                             env=newenv)
 +        self.addCleanup(p.stdout.close)
 +        self.assertIn(b"physalis", p.stdout.read())
  
 -        def test_shell_sequence_with_spaces(self):
 -            # call() function with sequence argument with spaces on Windows
 -            self.with_spaces([sys.executable, self.fname, "ab cd"], shell=1)
 +    def test_call_string(self):
 +        # call() function with string argument on Windows
 +        rc = subprocess.call(sys.executable +
 +                             ' -c "import sys; sys.exit(47)"')
 +        self.assertEqual(rc, 47)
 +
 +    def _kill_process(self, method, *args):
 +        # Some win32 buildbot raises EOFError if stdin is inherited
 +        p = subprocess.Popen([sys.executable, "-c", """if 1:
 +                             import sys, time
 +                             sys.stdout.write('x\\n')
 +                             sys.stdout.flush()
 +                             time.sleep(30)
 +                             """],
 +                             stdin=subprocess.PIPE,
 +                             stdout=subprocess.PIPE,
 +                             stderr=subprocess.PIPE)
 +        self.addCleanup(p.stdout.close)
 +        self.addCleanup(p.stderr.close)
 +        self.addCleanup(p.stdin.close)
 +        # Wait for the interpreter to be completely initialized before
 +        # sending any signal.
 +        p.stdout.read(1)
 +        getattr(p, method)(*args)
 +        _, stderr = p.communicate()
 +        self.assertStderrEqual(stderr, b'')
 +        returncode = p.wait()
 +        self.assertNotEqual(returncode, 0)
 +
 +    def test_send_signal(self):
 +        self._kill_process('send_signal', signal.SIGTERM)
 +
 +    def test_kill(self):
 +        self._kill_process('kill')
 +
 +    def test_terminate(self):
 +        self._kill_process('terminate')
 +
 +
 +# The module says:
 +#   "NB This only works (and is only relevant) for UNIX."
 +#
 +# Actually, getoutput should work on any platform with an os.popen, but
 +# I'll take the comment as given, and skip this suite.
 +@unittest.skipUnless(os.name == 'posix', "only relevant for UNIX")
 +class CommandTests(unittest.TestCase):
 +    def test_getoutput(self):
 +        self.assertEqual(subprocess.getoutput('echo xyzzy'), 'xyzzy')
 +        self.assertEqual(subprocess.getstatusoutput('echo xyzzy'),
 +                         (0, 'xyzzy'))
 +
 +        # we use mkdtemp in the next line to create an empty directory
 +        # under our exclusive control; from that, we can invent a pathname
 +        # that we _know_ won't exist.  This is guaranteed to fail.
 +        dir = None
 +        try:
 +            dir = tempfile.mkdtemp()
 +            name = os.path.join(dir, "foo")
  
 -        def test_noshell_string_with_spaces(self):
 -            # call() function with string argument with spaces on Windows
 -            self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname,
 -                                 "ab cd"))
 +            status, output = subprocess.getstatusoutput('cat ' + name)
 +            self.assertNotEqual(status, 0)
 +        finally:
 +            if dir is not None:
 +                os.rmdir(dir)
  
 -        def test_noshell_sequence_with_spaces(self):
 -            # call() function with sequence argument with spaces on Windows
 -            self.with_spaces([sys.executable, self.fname, "ab cd"])
  
 -    unit_tests.append(CommandsWithSpaces)
 +@unittest.skipUnless(getattr(subprocess, '_has_poll', False),
 +                     "poll system call not supported")
 +class ProcessTestCaseNoPoll(ProcessTestCase):
 +    def setUp(self):
 +        subprocess._has_poll = False
 +        ProcessTestCase.setUp(self)
  
 +    def tearDown(self):
 +        subprocess._has_poll = True
 +        ProcessTestCase.tearDown(self)
  
 -if getattr(subprocess, '_has_poll', False):
 -    class ProcessTestCaseNoPoll(ProcessTestCase):
 -        def setUp(self):
 -            subprocess._has_poll = False
 -            ProcessTestCase.setUp(self)
  
 -        def tearDown(self):
 -            subprocess._has_poll = True
 -            ProcessTestCase.tearDown(self)
 +@unittest.skipUnless(getattr(subprocess, '_posixsubprocess', False),
 +                     "_posixsubprocess extension module not found.")
 +class ProcessTestCasePOSIXPurePython(ProcessTestCase, POSIXProcessTestCase):
 +    def setUp(self):
 +        subprocess._posixsubprocess = None
 +        ProcessTestCase.setUp(self)
 +        POSIXProcessTestCase.setUp(self)
  
 -    unit_tests.append(ProcessTestCaseNoPoll)
 +    def tearDown(self):
 +        subprocess._posixsubprocess = sys.modules['_posixsubprocess']
 +        POSIXProcessTestCase.tearDown(self)
 +        ProcessTestCase.tearDown(self)
  
  
  class HelperFunctionTests(unittest.TestCase):
diff --cc Misc/NEWS
Simple merge