getattr(p, method)(*args)
return p
+ def _kill_dead_process(self, method, *args):
+ # Do not inherit file handles from the parent.
+ # It should fix failures on some platforms.
+ p = subprocess.Popen([sys.executable, "-c", """if 1:
+ import sys, time
+ sys.stdout.write('x\\n')
+ sys.stdout.flush()
+ """],
+ close_fds=True,
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ # Wait for the interpreter to be completely initialized before
+ # sending any signal.
+ p.stdout.read(1)
+ # The process should end after this
+ time.sleep(1)
+ # This shouldn't raise even though the child is now dead
+ getattr(p, method)(*args)
+ p.communicate()
+
def test_send_signal(self):
p = self._kill_process('send_signal', signal.SIGINT)
_, stderr = p.communicate()
self.assertStderrEqual(stderr, '')
self.assertEqual(p.wait(), -signal.SIGTERM)
+ def test_send_signal_dead(self):
+ # Sending a signal to a dead process
+ self._kill_dead_process('send_signal', signal.SIGINT)
+
+ def test_kill_dead(self):
+ # Killing a dead process
+ self._kill_dead_process('kill')
+
+ def test_terminate_dead(self):
+ # Terminating a dead process
+ self._kill_dead_process('terminate')
+
def check_close_std_fds(self, fds):
# Issue #9905: test that subprocess pipes still work properly with
# some standard fds closed
returncode = p.wait()
self.assertNotEqual(returncode, 0)
+ def _kill_dead_process(self, method, *args):
+ p = subprocess.Popen([sys.executable, "-c", """if 1:
+ import sys, time
+ sys.stdout.write('x\\n')
+ sys.stdout.flush()
+ sys.exit(42)
+ """],
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ self.addCleanup(p.stdout.close)
+ self.addCleanup(p.stderr.close)
+ self.addCleanup(p.stdin.close)
+ # Wait for the interpreter to be completely initialized before
+ # sending any signal.
+ p.stdout.read(1)
+ # The process should end after this
+ time.sleep(1)
+ # This shouldn't raise even though the child is now dead
+ getattr(p, method)(*args)
+ _, stderr = p.communicate()
+ self.assertStderrEqual(stderr, b'')
+ rc = p.wait()
+ self.assertEqual(rc, 42)
+
def test_send_signal(self):
self._kill_process('send_signal', signal.SIGTERM)
def test_terminate(self):
self._kill_process('terminate')
+ def test_send_signal_dead(self):
+ self._kill_dead_process('send_signal', signal.SIGTERM)
+
+ def test_kill_dead(self):
+ self._kill_dead_process('kill')
+
+ def test_terminate_dead(self):
+ self._kill_dead_process('terminate')
+
@unittest.skipUnless(getattr(subprocess, '_has_poll', False),
"poll system call not supported")