try:
if (follow_symlinks or (st.S_ISDIR(orig_st.st_mode) and
path.samestat(orig_st, stat(topfd)))):
- yield from _fwalk(topfd, top, topdown, onerror, follow_symlinks)
+ yield from _fwalk(topfd, top, isinstance(top, bytes),
+ topdown, onerror, follow_symlinks)
finally:
close(topfd)
- def _fwalk(topfd, toppath, topdown, onerror, follow_symlinks):
+ def _fwalk(topfd, toppath, isbytes, topdown, onerror, follow_symlinks):
# Note: This uses O(depth of the directory tree) file descriptors: if
# necessary, it can be adapted to only require O(1) FDs, see issue
# #13734.
names = listdir(topfd)
+ if isbytes:
+ names = map(fsencode, names)
dirs, nondirs = [], []
for name in names:
try:
try:
if follow_symlinks or path.samestat(orig_st, stat(dirfd)):
dirpath = path.join(toppath, name)
- yield from _fwalk(dirfd, dirpath, topdown, onerror, follow_symlinks)
+ yield from _fwalk(dirfd, dirpath, isbytes,
+ topdown, onerror, follow_symlinks)
finally:
close(dirfd)
"""Tests for os.fwalk()."""
def walk(self, top, **kwargs):
- for root, dirs, files, root_fd in os.fwalk(top, **kwargs):
+ for root, dirs, files, root_fd in self.fwalk(top, **kwargs):
yield (root, dirs, files)
+ def fwalk(self, *args, **kwargs):
+ return os.fwalk(*args, **kwargs)
+
def _compare_to_walk(self, walk_kwargs, fwalk_kwargs):
"""
compare with walk() results.
for root, dirs, files in os.walk(**walk_kwargs):
expected[root] = (set(dirs), set(files))
- for root, dirs, files, rootfd in os.fwalk(**fwalk_kwargs):
+ for root, dirs, files, rootfd in self.fwalk(**fwalk_kwargs):
self.assertIn(root, expected)
self.assertEqual(expected[root], (set(dirs), set(files)))
# check returned file descriptors
for topdown, follow_symlinks in itertools.product((True, False), repeat=2):
args = support.TESTFN, topdown, None
- for root, dirs, files, rootfd in os.fwalk(*args, follow_symlinks=follow_symlinks):
+ for root, dirs, files, rootfd in self.fwalk(*args, follow_symlinks=follow_symlinks):
# check that the FD is valid
os.fstat(rootfd)
# redundant check
minfd = os.dup(1)
os.close(minfd)
for i in range(256):
- for x in os.fwalk(support.TESTFN):
+ for x in self.fwalk(support.TESTFN):
pass
newfd = os.dup(1)
self.addCleanup(os.close, newfd)
class BytesWalkTests(WalkTests):
"""Tests for os.walk() with bytes."""
- def setUp(self):
- super().setUp()
- self.stack = contextlib.ExitStack()
-
- def tearDown(self):
- self.stack.close()
- super().tearDown()
-
def walk(self, top, **kwargs):
if 'follow_symlinks' in kwargs:
kwargs['followlinks'] = kwargs.pop('follow_symlinks')
bdirs[:] = list(map(os.fsencode, dirs))
bfiles[:] = list(map(os.fsencode, files))
+@unittest.skipUnless(hasattr(os, 'fwalk'), "Test needs os.fwalk()")
+class BytesFwalkTests(FwalkTests):
+ """Tests for os.walk() with bytes."""
+ def fwalk(self, top='.', *args, **kwargs):
+ for broot, bdirs, bfiles, topfd in os.fwalk(os.fsencode(top), *args, **kwargs):
+ root = os.fsdecode(broot)
+ dirs = list(map(os.fsdecode, bdirs))
+ files = list(map(os.fsdecode, bfiles))
+ yield (root, dirs, files, topfd)
+ bdirs[:] = list(map(os.fsencode, dirs))
+ bfiles[:] = list(map(os.fsencode, files))
+
class MakedirTests(unittest.TestCase):
def setUp(self):