class SiginterruptTest(unittest.TestCase):
signum = signal.SIGUSR1
- def readpipe_interrupted(self, cb):
+
+ def setUp(self):
+ """Install a no-op signal handler that can be set to allow
+ interrupts or not, and arrange for the original signal handler to be
+ re-installed when the test is finished.
+ """
+ oldhandler = signal.signal(self.signum, lambda x,y: None)
+ self.addCleanup(signal.signal, self.signum, oldhandler)
+
+ def readpipe_interrupted(self):
+ """Perform a read during which a signal will arrive. Return True if the
+ read is interrupted by the signal and raises an exception. Return False
+ if it returns normally.
+ """
+ # Create a pipe that can be used for the read. Also clean it up
+ # when the test is over, since nothing else will (but see below for
+ # the write end).
r, w = os.pipe()
+ self.addCleanup(os.close, r)
+
+ # Create another process which can send a signal to this one to try
+ # to interrupt the read.
ppid = os.getpid()
pid = os.fork()
- oldhandler = signal.signal(self.signum, lambda x,y: None)
- cb()
- if pid==0:
- # child code: sleep, kill, sleep. and then exit,
- # which closes the pipe from which the parent process reads
+ if pid == 0:
+ # Child code: sleep to give the parent enough time to enter the
+ # read() call (there's a race here, but it's really tricky to
+ # eliminate it); then signal the parent process. Also, sleep
+ # again to make it likely that the signal is delivered to the
+ # parent process before the child exits. If the child exits
+ # first, the write end of the pipe will be closed and the test
+ # is invalid.
try:
time.sleep(0.2)
os.kill(ppid, self.signum)
time.sleep(0.2)
finally:
+ # No matter what, just exit as fast as possible now.
exit_subprocess()
-
- try:
+ else:
+ # Parent code.
+ # Make sure the child is eventually reaped, else it'll be a
+ # zombie for the rest of the test suite run.
+ self.addCleanup(os.waitpid, pid, 0)
+
+ # Close the write end of the pipe. The child has a copy, so
+ # it's not really closed until the child exits. We need it to
+ # close when the child exits so that in the non-interrupt case
+ # the read eventually completes, otherwise we could just close
+ # it *after* the test.
os.close(w)
+ # Try the read and report whether it is interrupted or not to
+ # the caller.
try:
- d=os.read(r, 1)
+ d = os.read(r, 1)
return False
except OSError as err:
if err.errno != errno.EINTR:
raise
return True
- finally:
- signal.signal(self.signum, oldhandler)
- os.waitpid(pid, 0)
def test_without_siginterrupt(self):
- i=self.readpipe_interrupted(lambda: None)
- self.assertEquals(i, True)
+ """If a signal handler is installed and siginterrupt is not called
+ at all, when that signal arrives, it interrupts a syscall that's in
+ progress.
+ """
+ i = self.readpipe_interrupted()
+ self.assertTrue(i)
+ # Arrival of the signal shouldn't have changed anything.
+ i = self.readpipe_interrupted()
+ self.assertTrue(i)
def test_siginterrupt_on(self):
- i=self.readpipe_interrupted(lambda: signal.siginterrupt(self.signum, 1))
- self.assertEquals(i, True)
+ """If a signal handler is installed and siginterrupt is called with
+ a true value for the second argument, when that signal arrives, it
+ interrupts a syscall that's in progress.
+ """
+ signal.siginterrupt(self.signum, 1)
+ i = self.readpipe_interrupted()
+ self.assertTrue(i)
+ # Arrival of the signal shouldn't have changed anything.
+ i = self.readpipe_interrupted()
+ self.assertTrue(i)
def test_siginterrupt_off(self):
- i=self.readpipe_interrupted(lambda: signal.siginterrupt(self.signum, 0))
- self.assertEquals(i, False)
+ """If a signal handler is installed and siginterrupt is called with
+ a false value for the second argument, when that signal arrives, it
+ does not interrupt a syscall that's in progress.
+ """
+ signal.siginterrupt(self.signum, 0)
+ i = self.readpipe_interrupted()
+ self.assertFalse(i)
+ # Arrival of the signal shouldn't have changed anything.
+ i = self.readpipe_interrupted()
+ self.assertFalse(i)
+
+
class ItimerTest(unittest.TestCase):
def setUp(self):