import weakref
import gc
import abc
+import signal
+import errno
from itertools import chain, cycle, count
from collections import deque
from test import support
class PyMiscIOTest(MiscIOTest):
io = pyio
+
+@unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
+class SignalsTest(unittest.TestCase):
+
+ def setUp(self):
+ self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
+
+ def tearDown(self):
+ signal.signal(signal.SIGALRM, self.oldalrm)
+
+ def alarm_interrupt(self, sig, frame):
+ 1/0
+
+ @unittest.skipUnless(threading, 'Threading required for this test.')
+ def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
+ """Check that a partial write, when it gets interrupted, properly
+ invokes the signal handler."""
+ read_results = []
+ def _read():
+ s = os.read(r, 1)
+ read_results.append(s)
+ t = threading.Thread(target=_read)
+ t.daemon = True
+ r, w = os.pipe()
+ try:
+ wio = self.io.open(w, **fdopen_kwargs)
+ t.start()
+ signal.alarm(1)
+ # Fill the pipe enough that the write will be blocking.
+ # It will be interrupted by the timer armed above. Since the
+ # other thread has read one byte, the low-level write will
+ # return with a successful (partial) result rather than an EINTR.
+ # The buffered IO layer must check for pending signal
+ # handlers, which in this case will invoke alarm_interrupt().
+ self.assertRaises(ZeroDivisionError,
+ wio.write, item * (1024 * 1024))
+ t.join()
+ # We got one byte, get another one and check that it isn't a
+ # repeat of the first one.
+ read_results.append(os.read(r, 1))
+ self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
+ finally:
+ os.close(w)
+ os.close(r)
+ # This is deliberate. If we didn't close the file descriptor
+ # before closing wio, wio would try to flush its internal
+ # buffer, and block again.
+ try:
+ wio.close()
+ except IOError as e:
+ if e.errno != errno.EBADF:
+ raise
+
+ def test_interrupted_write_unbuffered(self):
+ self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
+
+ def test_interrupted_write_buffered(self):
+ self.check_interrupted_write(b"xy", b"xy", mode="wb")
+
+ def test_interrupted_write_text(self):
+ self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
+
+class CSignalsTest(SignalsTest):
+ io = io
+
+class PySignalsTest(SignalsTest):
+ io = pyio
+
+
def test_main():
tests = (CIOTest, PyIOTest,
CBufferedReaderTest, PyBufferedReaderTest,
StatefulIncrementalDecoderTest,
CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
CTextIOWrapperTest, PyTextIOWrapperTest,
- CMiscIOTest, PyMiscIOTest,)
+ CMiscIOTest, PyMiscIOTest,
+ CSignalsTest, PySignalsTest,
+ )
# Put the namespaces of the IO module we are testing and some useful mock
# classes in the __dict__ of each test.
self->write_pos += n;
self->raw_pos = self->write_pos;
written += Py_SAFE_DOWNCAST(n, Py_off_t, Py_ssize_t);
+ /* Partial writes can return successfully when interrupted by a
+ signal (see write(2)). We must run signal handlers before
+ blocking another time, possibly indefinitely. */
+ if (PyErr_CheckSignals() < 0)
+ goto error;
}
if (restore_pos) {
}
written += n;
remaining -= n;
+ /* Partial writes can return successfully when interrupted by a
+ signal (see write(2)). We must run signal handlers before
+ blocking another time, possibly indefinitely. */
+ if (PyErr_CheckSignals() < 0)
+ goto error;
}
if (self->readable)
_bufferedreader_reset_buf(self);