sub-second periodicity (contrarily to signal()).
"""
+import contextlib
import faulthandler
import io
import os
from test import support
+@contextlib.contextmanager
+def kill_on_error(proc):
+ """Context manager killing the subprocess if a Python exception is raised."""
+ with proc:
+ try:
+ yield proc
+ except:
+ proc.kill()
+ raise
+
@unittest.skipUnless(hasattr(signal, "setitimer"), "requires setitimer()")
class EINTRBaseTest(unittest.TestCase):
def setUpClass(cls):
cls.orig_handler = signal.signal(signal.SIGALRM, lambda *args: None)
if hasattr(faulthandler, 'dump_traceback_later'):
- # Most tests take less than 30 seconds, so 15 minutes should be
+ # Most tests take less than 30 seconds, so 5 minutes should be
# enough. dump_traceback_later() is implemented with a thread, but
# pthread_sigmask() is used to mask all signaled on this thread.
faulthandler.dump_traceback_later(5 * 60, exit=True)
' os.write(wr, data)',
))
- with self.subprocess(code, str(wr), pass_fds=[wr]) as proc:
+ proc = self.subprocess(code, str(wr), pass_fds=[wr])
+ with kill_on_error(proc):
os.close(wr)
for data in datas:
self.assertEqual(data, os.read(rd, len(data)))
' % (len(value), data_len))',
))
- with self.subprocess(code, str(rd), pass_fds=[rd]) as proc:
+ proc = self.subprocess(code, str(rd), pass_fds=[rd])
+ with kill_on_error(proc):
os.close(rd)
written = 0
while written < len(data):
fd = wr.fileno()
proc = self.subprocess(code, str(fd), pass_fds=[fd])
- with proc:
+ with kill_on_error(proc):
wr.close()
for data in datas:
self.assertEqual(data, recv_func(rd, len(data)))
fd = rd.fileno()
proc = self.subprocess(code, str(fd), pass_fds=[fd])
- with proc:
+ with kill_on_error(proc):
rd.close()
written = 0
while written < len(data):
' time.sleep(sleep_time)',
))
- with self.subprocess(code) as proc:
+ proc = self.subprocess(code)
+ with kill_on_error(proc):
client_sock, _ = sock.accept()
client_sock.close()
self.assertEqual(proc.wait(), 0)
do_open_close_reader,
))
- with self.subprocess(code) as proc:
+ proc = self.subprocess(code)
+ with kill_on_error(proc):
do_open_close_writer(filename)
self.assertEqual(proc.wait(), 0)
))
t0 = time.monotonic()
- with self.subprocess(code) as proc:
+ proc = self.subprocess(code)
+ with kill_on_error(proc):
# parent
signal.sigwaitinfo([signum])
dt = time.monotonic() - t0