yield
+def create_file(filename, content=b'content'):
+ with open(filename, "xb", 0) as fp:
+ fp.write(content)
+
+
# Tests creating TESTFN
class FileTests(unittest.TestCase):
def setUp(self):
"needs INT_MAX < PY_SSIZE_T_MAX")
@support.bigmemtest(size=INT_MAX + 10, memuse=1, dry_run=False)
def test_large_read(self, size):
- with open(support.TESTFN, "wb") as fp:
- fp.write(b'test')
self.addCleanup(support.unlink, support.TESTFN)
+ create_file(support.TESTFN, b'test')
# Issue #21932: Make sure that os.read() does not raise an
# OverflowError for size larger than INT_MAX
def test_replace(self):
TESTFN2 = support.TESTFN + ".2"
- with open(support.TESTFN, 'w') as f:
- f.write("1")
- with open(TESTFN2, 'w') as f:
- f.write("2")
- self.addCleanup(os.unlink, TESTFN2)
+ self.addCleanup(support.unlink, support.TESTFN)
+ self.addCleanup(support.unlink, TESTFN2)
+
+ create_file(support.TESTFN, b"1")
+ create_file(TESTFN2, b"2")
+
os.replace(support.TESTFN, TESTFN2)
self.assertRaises(FileNotFoundError, os.stat, support.TESTFN)
with open(TESTFN2, 'r') as f:
def setUp(self):
self.fname = support.TESTFN
self.addCleanup(support.unlink, self.fname)
- with open(self.fname, 'wb') as fp:
- fp.write(b"ABC")
+ create_file(self.fname, b"ABC")
@unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
def check_stat_attributes(self, fname):
self.addCleanup(support.rmtree, self.dirname)
os.mkdir(self.dirname)
- with open(self.fname, 'wb') as fp:
- fp.write(b"ABC")
+ create_file(self.fname)
def restore_float_times(state):
with ignore_deprecation_warnings('stat_float_times'):
"fd support for utime required for this test.")
def test_utime_fd(self):
def set_time(filename, ns):
- with open(filename, 'wb') as fp:
+ with open(filename, 'wb', 0) as fp:
# use a file descriptor to test futimens(timespec)
# or futimes(timeval)
os.utime(fp.fileno(), ns=ns)
os.mkdir(dira)
dirb = os.path.join(dira, 'dirb')
os.mkdir(dirb)
- with open(os.path.join(dira, 'file.txt'), 'w') as f:
- f.write('text')
+ create_file(os.path.join(dira, 'file.txt'))
os.removedirs(dirb)
self.assertFalse(os.path.exists(dirb))
self.assertTrue(os.path.exists(dira))
os.mkdir(dira)
dirb = os.path.join(dira, 'dirb')
os.mkdir(dirb)
- with open(os.path.join(dirb, 'file.txt'), 'w') as f:
- f.write('text')
+ create_file(os.path.join(dirb, 'file.txt'))
with self.assertRaises(OSError):
os.removedirs(dirb)
self.assertTrue(os.path.exists(dirb))
class DevNullTests(unittest.TestCase):
def test_devnull(self):
- with open(os.devnull, 'wb') as f:
+ with open(os.devnull, 'wb', 0) as f:
f.write(b'hello')
f.close()
with open(os.devnull, 'rb') as f:
def test_urandom_fd_reopened(self):
# Issue #21207: urandom() should detect its fd to /dev/urandom
# changed to something else, and reopen it.
- with open(support.TESTFN, 'wb') as f:
- f.write(b"x" * 256)
- self.addCleanup(os.unlink, support.TESTFN)
+ self.addCleanup(support.unlink, support.TESTFN)
+ create_file(support.TESTFN, b"x" * 256)
+
code = """if 1:
import os
import sys
self.assertRaises(OSError, os.chdir, support.TESTFN)
def test_mkdir(self):
- f = open(support.TESTFN, "w")
- try:
+ self.addCleanup(support.unlink, support.TESTFN)
+
+ with open(support.TESTFN, "w") as f:
self.assertRaises(OSError, os.mkdir, support.TESTFN)
- finally:
- f.close()
- os.unlink(support.TESTFN)
def test_utime(self):
self.assertRaises(OSError, os.utime, support.TESTFN, None)
level1 = os.path.abspath(support.TESTFN)
level2 = os.path.join(level1, "level2")
level3 = os.path.join(level2, "level3")
- try:
- os.mkdir(level1)
- os.mkdir(level2)
- os.mkdir(level3)
+ self.addCleanup(support.rmtree, level1)
- file1 = os.path.abspath(os.path.join(level1, "file1"))
+ os.mkdir(level1)
+ os.mkdir(level2)
+ os.mkdir(level3)
- with open(file1, "w") as f:
- f.write("file1")
+ file1 = os.path.abspath(os.path.join(level1, "file1"))
+ create_file(file1)
- orig_dir = os.getcwd()
- try:
- os.chdir(level2)
- link = os.path.join(level2, "link")
- os.symlink(os.path.relpath(file1), "link")
- self.assertIn("link", os.listdir(os.getcwd()))
-
- # Check os.stat calls from the same dir as the link
- self.assertEqual(os.stat(file1), os.stat("link"))
-
- # Check os.stat calls from a dir below the link
- os.chdir(level1)
- self.assertEqual(os.stat(file1),
- os.stat(os.path.relpath(link)))
-
- # Check os.stat calls from a dir above the link
- os.chdir(level3)
- self.assertEqual(os.stat(file1),
- os.stat(os.path.relpath(link)))
- finally:
- os.chdir(orig_dir)
- except OSError as err:
- self.fail(err)
+ orig_dir = os.getcwd()
+ try:
+ os.chdir(level2)
+ link = os.path.join(level2, "link")
+ os.symlink(os.path.relpath(file1), "link")
+ self.assertIn("link", os.listdir(os.getcwd()))
+
+ # Check os.stat calls from the same dir as the link
+ self.assertEqual(os.stat(file1), os.stat("link"))
+
+ # Check os.stat calls from a dir below the link
+ os.chdir(level1)
+ self.assertEqual(os.stat(file1),
+ os.stat(os.path.relpath(link)))
+
+ # Check os.stat calls from a dir above the link
+ os.chdir(level3)
+ self.assertEqual(os.stat(file1),
+ os.stat(os.path.relpath(link)))
finally:
- os.remove(file1)
- shutil.rmtree(level1)
+ os.chdir(orig_dir)
@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
try:
new_prio = os.getpriority(os.PRIO_PROCESS, os.getpid())
if base >= 19 and new_prio <= 19:
- raise unittest.SkipTest(
- "unable to reliably test setpriority at current nice level of %s" % base)
+ raise unittest.SkipTest("unable to reliably test setpriority "
+ "at current nice level of %s" % base)
else:
self.assertEqual(new_prio, base + 1)
finally:
@classmethod
def setUpClass(cls):
cls.key = support.threading_setup()
- with open(support.TESTFN, "wb") as f:
- f.write(cls.DATA)
+ create_file(support.TESTFN, cls.DATA)
@classmethod
def tearDownClass(cls):
def test_trailers(self):
TESTFN2 = support.TESTFN + "2"
file_data = b"abcdef"
- with open(TESTFN2, 'wb') as f:
- f.write(file_data)
- with open(TESTFN2, 'rb')as f:
- self.addCleanup(os.remove, TESTFN2)
+
+ self.addCleanup(support.unlink, TESTFN2)
+ create_file(TESTFN2, file_data)
+
+ with open(TESTFN2, 'rb') as f:
os.sendfile(self.sockno, f.fileno(), 0, len(file_data),
trailers=[b"1234"])
self.client.close()
def supports_extended_attributes():
if not hasattr(os, "setxattr"):
return False
+
try:
- with open(support.TESTFN, "wb") as fp:
+ with open(support.TESTFN, "xb", 0) as fp:
try:
os.setxattr(fp.fileno(), b"user.test", b"")
except OSError:
@support.requires_linux_version(2, 6, 39)
class ExtendedAttributeTests(unittest.TestCase):
- def tearDown(self):
- support.unlink(support.TESTFN)
-
def _check_xattrs_str(self, s, getxattr, setxattr, removexattr, listxattr, **kwargs):
fn = support.TESTFN
- open(fn, "wb").close()
+ self.addCleanup(support.unlink, fn)
+ create_file(fn)
+
with self.assertRaises(OSError) as cm:
getxattr(fn, s("user.test"), **kwargs)
self.assertEqual(cm.exception.errno, errno.ENODATA)
+
init_xattr = listxattr(fn)
self.assertIsInstance(init_xattr, list)
+
setxattr(fn, s("user.test"), b"", **kwargs)
xattr = set(init_xattr)
xattr.add("user.test")
self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"")
setxattr(fn, s("user.test"), b"hello", os.XATTR_REPLACE, **kwargs)
self.assertEqual(getxattr(fn, b"user.test", **kwargs), b"hello")
+
with self.assertRaises(OSError) as cm:
setxattr(fn, s("user.test"), b"bye", os.XATTR_CREATE, **kwargs)
self.assertEqual(cm.exception.errno, errno.EEXIST)
+
with self.assertRaises(OSError) as cm:
setxattr(fn, s("user.test2"), b"bye", os.XATTR_REPLACE, **kwargs)
self.assertEqual(cm.exception.errno, errno.ENODATA)
+
setxattr(fn, s("user.test2"), b"foo", os.XATTR_CREATE, **kwargs)
xattr.add("user.test2")
self.assertEqual(set(listxattr(fn)), xattr)
removexattr(fn, s("user.test"), **kwargs)
+
with self.assertRaises(OSError) as cm:
getxattr(fn, s("user.test"), **kwargs)
self.assertEqual(cm.exception.errno, errno.ENODATA)
+
xattr.remove("user.test")
self.assertEqual(set(listxattr(fn)), xattr)
self.assertEqual(getxattr(fn, s("user.test2"), **kwargs), b"foo")
self.assertEqual(set(listxattr(fn)), set(init_xattr) | set(many))
def _check_xattrs(self, *args, **kwargs):
- def make_bytes(s):
- return bytes(s, "ascii")
self._check_xattrs_str(str, *args, **kwargs)
support.unlink(support.TESTFN)
- self._check_xattrs_str(make_bytes, *args, **kwargs)
+
+ self._check_xattrs_str(os.fsencode, *args, **kwargs)
+ support.unlink(support.TESTFN)
def test_simple(self):
self._check_xattrs(os.getxattr, os.setxattr, os.removexattr,
with open(path, "rb") as fp:
return os.getxattr(fp.fileno(), *args)
def setxattr(path, *args):
- with open(path, "wb") as fp:
+ with open(path, "wb", 0) as fp:
os.setxattr(fp.fileno(), *args)
def removexattr(path, *args):
- with open(path, "wb") as fp:
+ with open(path, "wb", 0) as fp:
os.removexattr(fp.fileno(), *args)
def listxattr(path, *args):
with open(path, "rb") as fp:
def create_file(self, name="file.txt"):
filename = os.path.join(self.path, name)
- with open(filename, "wb") as fp:
- fp.write(b'python')
+ create_file(filename, b'python')
return filename
def get_entries(self, names):