self.checkraises(TypeError, '%10.*f', '__mod__', ('foo', 42.))
self.checkraises(ValueError, '%10', '__mod__', (42,))
+ # Outrageously large width or precision should raise ValueError.
+ self.checkraises(ValueError, '%%%df' % (2**64), '__mod__', (3.2))
+ self.checkraises(ValueError, '%%.%df' % (2**64), '__mod__', (3.2))
+
+ self.checkraises(OverflowError, '%*s', '__mod__',
+ (_testcapi.PY_SSIZE_T_MAX + 1, ''))
+ self.checkraises(OverflowError, '%.*f', '__mod__',
+ (_testcapi.INT_MAX + 1, 1. / 7))
+ # Issue 15989
+ self.checkraises(OverflowError, '%*s', '__mod__',
+ (1 << (_testcapi.PY_SSIZE_T_MAX.bit_length() + 1), ''))
+ self.checkraises(OverflowError, '%.*f', '__mod__',
+ (_testcapi.UINT_MAX + 1, 1. / 7))
+
class X(object): pass
self.checkraises(TypeError, 'abc', '__mod__', X())
# test both implementations. This file has lots of examples.
################################################################################
+import abc
+import array
+import errno
+import locale
import os
+import pickle
+import random
+import signal
import sys
import time
-import array
-import random
import unittest
-import weakref
-import abc
-import signal
-import errno
import warnings
-import pickle
+import weakref
+ import _testcapi
-from itertools import cycle, count
from collections import deque, UserList
+from itertools import cycle, count
from test import support
import codecs
t.write("A\rB")
self.assertEqual(r.getvalue(), b"XY\nZA\rB")
+ def test_default_encoding(self):
+ old_environ = dict(os.environ)
+ try:
+ # try to get a user preferred encoding different than the current
+ # locale encoding to check that TextIOWrapper() uses the current
+ # locale encoding and not the user preferred encoding
+ for key in ('LC_ALL', 'LANG', 'LC_CTYPE'):
+ if key in os.environ:
+ del os.environ[key]
+
+ current_locale_encoding = locale.getpreferredencoding(False)
+ b = self.BytesIO()
+ t = self.TextIOWrapper(b)
+ self.assertEqual(t.encoding, current_locale_encoding)
+ finally:
+ os.environ.clear()
+ os.environ.update(old_environ)
+
+ # Issue 15989
+ def test_device_encoding(self):
+ b = self.BytesIO()
+ b.fileno = lambda: _testcapi.INT_MAX + 1
+ self.assertRaises(OverflowError, self.TextIOWrapper, b)
+ b.fileno = lambda: _testcapi.UINT_MAX + 1
+ self.assertRaises(OverflowError, self.TextIOWrapper, b)
+
def test_encoding(self):
# Check the encoding attribute is always set, and valid
b = self.BytesIO()
import tempfile
import unittest
import warnings
++import _testcapi
_DUMMY_SYMLINK = os.path.join(tempfile.gettempdir(),
support.TESTFN + '-dummy-symlink')
os.close(reader)
os.close(writer)
+ @unittest.skipUnless(hasattr(os, 'pipe2'), "test needs os.pipe2()")
+ @support.requires_linux_version(2, 6, 27)
+ def test_pipe2(self):
+ self.assertRaises(TypeError, os.pipe2, 'DEADBEEF')
+ self.assertRaises(TypeError, os.pipe2, 0, 0)
+
+ # try calling with flags = 0, like os.pipe()
+ r, w = os.pipe2(0)
+ os.close(r)
+ os.close(w)
+
+ # test flags
+ r, w = os.pipe2(os.O_CLOEXEC|os.O_NONBLOCK)
+ self.addCleanup(os.close, r)
+ self.addCleanup(os.close, w)
+ self.assertTrue(fcntl.fcntl(r, fcntl.F_GETFD) & fcntl.FD_CLOEXEC)
+ self.assertTrue(fcntl.fcntl(w, fcntl.F_GETFD) & fcntl.FD_CLOEXEC)
+ # try reading from an empty pipe: this should fail, not block
+ self.assertRaises(OSError, os.read, r, 1)
+ # try a write big enough to fill-up the pipe: this should either
+ # fail or perform a partial write, not block
+ try:
+ os.write(w, b'x' * support.PIPE_MAX_SIZE)
+ except OSError:
+ pass
+
++ # Issue 15989
++ self.assertRaises(OverflowError, os.pipe2, _testcapi.INT_MAX + 1)
++ self.assertRaises(OverflowError, os.pipe2, _testcapi.UINT_MAX + 1)
++
def test_utime(self):
if hasattr(posix, 'utime'):
now = time.time()
self.assertRaises(ValueError, fp.writable)
self.assertRaises(ValueError, fp.seekable)
- def test_listen_backlog0(self):
+ def test_pickle(self):
+ sock = socket.socket()
+ with sock:
+ for protocol in range(pickle.HIGHEST_PROTOCOL + 1):
+ self.assertRaises(TypeError, pickle.dumps, sock, protocol)
+
+ def test_listen_backlog(self):
+ for backlog in 0, -1:
+ srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ srv.bind((HOST, 0))
+ srv.listen(backlog)
+ srv.close()
+
+ # Issue 15989
srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
srv.bind((HOST, 0))
- # backlog = 0
- srv.listen(0)
+ self.assertRaises(OverflowError, srv.listen, _testcapi.INT_MAX + 1)
srv.close()
- @unittest.skipUnless(SUPPORTS_IPV6, 'IPv6 required for this test.')
+ @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 required for this test.')
def test_flowinfo(self):
self.assertRaises(OverflowError, socket.getnameinfo,
('::1',0, 0xffffffff), 0)
goto error;
}
- Py_BEGIN_ALLOW_THREADS
errno = 0;
+ if (opener == Py_None) {
+ Py_BEGIN_ALLOW_THREADS
#ifdef MS_WINDOWS
- if (widename != NULL)
- self->fd = _wopen(widename, flags, 0666);
- else
+ if (widename != NULL)
+ self->fd = _wopen(widename, flags, 0666);
+ else
#endif
- self->fd = open(name, flags, 0666);
- Py_END_ALLOW_THREADS
+ self->fd = open(name, flags, 0666);
+ Py_END_ALLOW_THREADS
+ } else {
+ PyObject *fdobj = PyObject_CallFunction(
+ opener, "Oi", nameobj, flags);
+ if (fdobj == NULL)
+ goto error;
+ if (!PyLong_Check(fdobj)) {
+ Py_DECREF(fdobj);
+ PyErr_SetString(PyExc_TypeError,
+ "expected integer from opener");
+ goto error;
+ }
+
- self->fd = PyLong_AsLong(fdobj);
++ self->fd = _PyLong_AsInt(fdobj);
+ Py_DECREF(fdobj);
+ if (self->fd == -1) {
+ goto error;
+ }
+ }
+
fd_is_own = 1;
if (self->fd < 0) {
#ifdef MS_WINDOWS
}
}
else {
- int fd = (int) PyLong_AsLong(fileno);
- self->encoding = PyObject_CallMethod(state->os_module,
- "device_encoding",
- "N", fileno);
++ int fd = _PyLong_AsInt(fileno);
+ Py_DECREF(fileno);
+ if (fd == -1 && PyErr_Occurred()) {
+ goto error;
+ }
+
+ self->encoding = _Py_device_encoding(fd);
if (self->encoding == NULL)
goto error;
else if (!PyUnicode_Check(self->encoding))
/* elem must always be a sequence, however simple */
PyObject* elem = PySequence_GetItem(tuple, i);
int ok = elem != NULL;
-- long type = 0;
++ int type = 0;
char *strn = 0;
if (ok)
ok = 0;
else {
ok = PyLong_Check(temp);
-- if (ok)
-- type = PyLong_AS_LONG(temp);
++ if (ok) {
++ type = _PyLong_AsInt(temp);
++ if (type == -1 && PyErr_Occurred()) {
++ Py_DECREF(temp);
++ Py_DECREF(elem);
++ return 0;
++ }
++ }
Py_DECREF(temp);
}
}
if (len == 3) {
PyObject *o = PySequence_GetItem(elem, 2);
if (o != NULL) {
-- if (PyLong_Check(o))
-- *line_num = PyLong_AS_LONG(o);
++ if (PyLong_Check(o)) {
++ int num = _PyLong_AsInt(o);
++ if (num == -1 && PyErr_Occurred()) {
++ Py_DECREF(o);
++ Py_DECREF(temp);
++ Py_DECREF(elem);
++ return 0;
++ }
++ *line_num = num;
++ }
else {
PyErr_Format(parser_error,
"third item in terminal node must be an"
}
#endif /* HAVE_PIPE */
- flags = PyLong_AsLong(arg);
+#ifdef HAVE_PIPE2
+PyDoc_STRVAR(posix_pipe2__doc__,
+"pipe2(flags) -> (read_end, write_end)\n\n\
+Create a pipe with flags set atomically.\n\
+flags can be constructed by ORing together one or more of these values:\n\
+O_NONBLOCK, O_CLOEXEC.\n\
+");
+
+static PyObject *
+posix_pipe2(PyObject *self, PyObject *arg)
+{
+ int flags;
+ int fds[2];
+ int res;
+
++ flags = _PyLong_AsInt(arg);
+ if (flags == -1 && PyErr_Occurred())
+ return NULL;
+
+ res = pipe2(fds, flags);
+ if (res != 0)
+ return posix_error();
+ return Py_BuildValue("(ii)", fds[0], fds[1]);
+}
+#endif /* HAVE_PIPE2 */
+
+#ifdef HAVE_WRITEV
+PyDoc_STRVAR(posix_writev__doc__,
+"writev(fd, buffers) -> byteswritten\n\n\
+Write the contents of buffers to a file descriptor, where buffers is an\n\
+arbitrary sequence of buffers.\n\
+Returns the total bytes written.");
+
+static PyObject *
+posix_writev(PyObject *self, PyObject *args)
+{
+ int fd, cnt;
+ Py_ssize_t res;
+ PyObject *seq;
+ struct iovec *iov;
+ Py_buffer *buf;
+ if (!PyArg_ParseTuple(args, "iO:writev", &fd, &seq))
+ return NULL;
+ if (!PySequence_Check(seq)) {
+ PyErr_SetString(PyExc_TypeError,
+ "writev() arg 2 must be a sequence");
+ return NULL;
+ }
+ cnt = PySequence_Size(seq);
+
+ if (!iov_setup(&iov, &buf, seq, cnt, PyBUF_SIMPLE)) {
+ return NULL;
+ }
+
+ Py_BEGIN_ALLOW_THREADS
+ res = writev(fd, iov, cnt);
+ Py_END_ALLOW_THREADS
+
+ iov_cleanup(iov, buf, cnt);
+ return PyLong_FromSsize_t(res);
+}
+#endif
+
+#ifdef HAVE_PWRITE
+PyDoc_STRVAR(posix_pwrite__doc__,
+"pwrite(fd, string, offset) -> byteswritten\n\n\
+Write string to a file descriptor, fd, from offset, leaving the file\n\
+offset unchanged.");
+
+static PyObject *
+posix_pwrite(PyObject *self, PyObject *args)
+{
+ Py_buffer pbuf;
+ int fd;
+ off_t offset;
+ Py_ssize_t size;
+
+ if (!PyArg_ParseTuple(args, "iy*O&:pwrite", &fd, &pbuf, _parse_off_t, &offset))
+ return NULL;
+
+ if (!_PyVerify_fd(fd)) {
+ PyBuffer_Release(&pbuf);
+ return posix_error();
+ }
+ Py_BEGIN_ALLOW_THREADS
+ size = pwrite(fd, pbuf.buf, (size_t)pbuf.len, offset);
+ Py_END_ALLOW_THREADS
+ PyBuffer_Release(&pbuf);
+ if (size < 0)
+ return posix_error();
+ return PyLong_FromSsize_t(size);
+}
+#endif
#ifdef HAVE_MKFIFO
PyDoc_STRVAR(posix_mkfifo__doc__,
{
int fd;
PyObject *meth;
+ _Py_IDENTIFIER(fileno);
if (PyLong_Check(o)) {
- fd = PyLong_AsLong(o);
+ fd = _PyLong_AsInt(o);
}
- else if ((meth = PyObject_GetAttrString(o, "fileno")) != NULL)
+ else if ((meth = _PyObject_GetAttrId(o, &PyId_fileno)) != NULL)
{
PyObject *fno = PyEval_CallObject(meth, NULL);
Py_DECREF(meth);