from contextlib import ExitStack
from os.path import splitdrive
from distutils.spawn import find_executable, spawn
-from shutil import (_make_tarball, _make_zipfile, make_archive,
+from shutil import (make_archive,
register_archive_format, unregister_archive_format,
get_archive_formats, Error, unpack_archive,
register_unpack_format, RegistryError,
with open(path, 'rb' if binary else 'r') as fp:
return fp.read()
+def rlistdir(path):
+ res = []
+ for name in sorted(os.listdir(path)):
+ p = os.path.join(path, name)
+ if os.path.isdir(p) and not os.path.islink(p):
+ res.append(name + '/')
+ for n in rlistdir(p):
+ res.append(name + '/' + n)
+ else:
+ res.append(name)
+ return res
+
class TestShutil(unittest.TestCase):
@requires_zlib
def test_make_tarball(self):
# creating something to tar
- tmpdir = self.mkdtemp()
- write_file((tmpdir, 'file1'), 'xxx')
- write_file((tmpdir, 'file2'), 'xxx')
- os.mkdir(os.path.join(tmpdir, 'sub'))
- write_file((tmpdir, 'sub', 'file3'), 'xxx')
+ root_dir, base_dir = self._create_files('')
tmpdir2 = self.mkdtemp()
# force shutil to create the directory
os.rmdir(tmpdir2)
- unittest.skipUnless(splitdrive(tmpdir)[0] == splitdrive(tmpdir2)[0],
+ unittest.skipUnless(splitdrive(root_dir)[0] == splitdrive(tmpdir2)[0],
"source and target should be on same drive")
base_name = os.path.join(tmpdir2, 'archive')
# working with relative paths to avoid tar warnings
- with support.change_cwd(tmpdir):
- _make_tarball(splitdrive(base_name)[1], '.')
+ make_archive(splitdrive(base_name)[1], 'gztar', root_dir, '.')
# check if the compressed tarball was created
tarball = base_name + '.tar.gz'
- self.assertTrue(os.path.exists(tarball))
+ self.assertTrue(os.path.isfile(tarball))
+ self.assertTrue(tarfile.is_tarfile(tarball))
+ with tarfile.open(tarball, 'r:gz') as tf:
+ self.assertCountEqual(tf.getnames(),
+ ['.', './sub', './sub2',
+ './file1', './file2', './sub/file3'])
# trying an uncompressed one
base_name = os.path.join(tmpdir2, 'archive')
- with support.change_cwd(tmpdir):
- _make_tarball(splitdrive(base_name)[1], '.', compress=None)
+ make_archive(splitdrive(base_name)[1], 'tar', root_dir, '.')
tarball = base_name + '.tar'
- self.assertTrue(os.path.exists(tarball))
+ self.assertTrue(os.path.isfile(tarball))
+ self.assertTrue(tarfile.is_tarfile(tarball))
+ with tarfile.open(tarball, 'r') as tf:
+ self.assertCountEqual(tf.getnames(),
+ ['.', './sub', './sub2',
+ './file1', './file2', './sub/file3'])
def _tarinfo(self, path):
- tar = tarfile.open(path)
- try:
+ with tarfile.open(path) as tar:
names = tar.getnames()
names.sort()
return tuple(names)
- finally:
- tar.close()
- def _create_files(self):
+ def _create_files(self, base_dir='dist'):
# creating something to tar
- tmpdir = self.mkdtemp()
- dist = os.path.join(tmpdir, 'dist')
- os.mkdir(dist)
+ root_dir = self.mkdtemp()
+ dist = os.path.join(root_dir, base_dir)
+ os.makedirs(dist, exist_ok=True)
write_file((dist, 'file1'), 'xxx')
write_file((dist, 'file2'), 'xxx')
os.mkdir(os.path.join(dist, 'sub'))
write_file((dist, 'sub', 'file3'), 'xxx')
os.mkdir(os.path.join(dist, 'sub2'))
- tmpdir2 = self.mkdtemp()
- base_name = os.path.join(tmpdir2, 'archive')
- return tmpdir, tmpdir2, base_name
+ if base_dir:
+ write_file((root_dir, 'outer'), 'xxx')
+ return root_dir, base_dir
@requires_zlib
- @unittest.skipUnless(find_executable('tar') and find_executable('gzip'),
+ @unittest.skipUnless(find_executable('tar'),
'Need the tar command to run')
def test_tarfile_vs_tar(self):
- tmpdir, tmpdir2, base_name = self._create_files()
- with support.change_cwd(tmpdir):
- _make_tarball(base_name, 'dist')
+ root_dir, base_dir = self._create_files()
+ base_name = os.path.join(self.mkdtemp(), 'archive')
+ make_archive(base_name, 'gztar', root_dir, base_dir)
# check if the compressed tarball was created
tarball = base_name + '.tar.gz'
- self.assertTrue(os.path.exists(tarball))
+ self.assertTrue(os.path.isfile(tarball))
# now create another tarball using `tar`
- tarball2 = os.path.join(tmpdir, 'archive2.tar.gz')
- tar_cmd = ['tar', '-cf', 'archive2.tar', 'dist']
- gzip_cmd = ['gzip', '-f9', 'archive2.tar']
- with support.change_cwd(tmpdir):
- with captured_stdout() as s:
- spawn(tar_cmd)
- spawn(gzip_cmd)
-
- self.assertTrue(os.path.exists(tarball2))
+ tarball2 = os.path.join(root_dir, 'archive2.tar')
+ tar_cmd = ['tar', '-cf', 'archive2.tar', base_dir]
+ with support.change_cwd(root_dir), captured_stdout():
+ spawn(tar_cmd)
+
+ self.assertTrue(os.path.isfile(tarball2))
# let's compare both tarballs
self.assertEqual(self._tarinfo(tarball), self._tarinfo(tarball2))
# trying an uncompressed one
- base_name = os.path.join(tmpdir2, 'archive')
- with support.change_cwd(tmpdir):
- _make_tarball(base_name, 'dist', compress=None)
+ make_archive(base_name, 'tar', root_dir, base_dir)
tarball = base_name + '.tar'
- self.assertTrue(os.path.exists(tarball))
+ self.assertTrue(os.path.isfile(tarball))
# now for a dry_run
- base_name = os.path.join(tmpdir2, 'archive')
- with support.change_cwd(tmpdir):
- _make_tarball(base_name, 'dist', compress=None, dry_run=True)
+ make_archive(base_name, 'tar', root_dir, base_dir, dry_run=True)
tarball = base_name + '.tar'
- self.assertTrue(os.path.exists(tarball))
+ self.assertTrue(os.path.isfile(tarball))
@requires_zlib
@unittest.skipUnless(ZIP_SUPPORT, 'Need zip support to run')
def test_make_zipfile(self):
- # creating something to tar
- tmpdir = self.mkdtemp()
- write_file((tmpdir, 'file1'), 'xxx')
- write_file((tmpdir, 'file2'), 'xxx')
-
- tmpdir2 = self.mkdtemp()
- # force shutil to create the directory
- os.rmdir(tmpdir2)
- base_name = os.path.join(tmpdir2, 'archive')
- _make_zipfile(base_name, tmpdir)
+ # creating something to zip
+ root_dir, base_dir = self._create_files()
+ base_name = os.path.join(self.mkdtemp(), 'archive')
+ res = make_archive(base_name, 'zip', root_dir, 'dist')
- # check if the compressed tarball was created
- tarball = base_name + '.zip'
- self.assertTrue(os.path.exists(tarball))
+ self.assertEqual(res, base_name + '.zip')
+ self.assertTrue(os.path.isfile(res))
+ self.assertTrue(zipfile.is_zipfile(res))
+ with zipfile.ZipFile(res) as zf:
+ self.assertCountEqual(zf.namelist(),
+ ['dist/file1', 'dist/file2', 'dist/sub/file3'])
def test_make_archive(self):
else:
group = owner = 'root'
- base_dir, root_dir, base_name = self._create_files()
- base_name = os.path.join(self.mkdtemp() , 'archive')
+ root_dir, base_dir = self._create_files()
+ base_name = os.path.join(self.mkdtemp(), 'archive')
res = make_archive(base_name, 'zip', root_dir, base_dir, owner=owner,
group=group)
- self.assertTrue(os.path.exists(res))
+ self.assertTrue(os.path.isfile(res))
res = make_archive(base_name, 'zip', root_dir, base_dir)
- self.assertTrue(os.path.exists(res))
+ self.assertTrue(os.path.isfile(res))
res = make_archive(base_name, 'tar', root_dir, base_dir,
owner=owner, group=group)
- self.assertTrue(os.path.exists(res))
+ self.assertTrue(os.path.isfile(res))
res = make_archive(base_name, 'tar', root_dir, base_dir,
owner='kjhkjhkjg', group='oihohoh')
- self.assertTrue(os.path.exists(res))
+ self.assertTrue(os.path.isfile(res))
@requires_zlib
@unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
def test_tarfile_root_owner(self):
- tmpdir, tmpdir2, base_name = self._create_files()
+ root_dir, base_dir = self._create_files()
+ base_name = os.path.join(self.mkdtemp(), 'archive')
group = grp.getgrgid(0)[0]
owner = pwd.getpwuid(0)[0]
- with support.change_cwd(tmpdir):
- archive_name = _make_tarball(base_name, 'dist', compress=None,
- owner=owner, group=group)
+ with support.change_cwd(root_dir):
+ archive_name = make_archive(base_name, 'gztar', root_dir, 'dist',
+ owner=owner, group=group)
# check if the compressed tarball was created
- self.assertTrue(os.path.exists(archive_name))
+ self.assertTrue(os.path.isfile(archive_name))
# now checks the rights
archive = tarfile.open(archive_name)
formats = [name for name, params in get_archive_formats()]
self.assertNotIn('xxx', formats)
- def _compare_dirs(self, dir1, dir2):
- # check that dir1 and dir2 are equivalent,
- # return the diff
- diff = []
- for root, dirs, files in os.walk(dir1):
- for file_ in files:
- path = os.path.join(root, file_)
- target_path = os.path.join(dir2, os.path.split(path)[-1])
- if not os.path.exists(target_path):
- diff.append(file_)
- return diff
-
@requires_zlib
def test_unpack_archive(self):
formats = ['tar', 'gztar', 'zip']
if BZ2_SUPPORTED:
formats.append('bztar')
+ root_dir, base_dir = self._create_files()
for format in formats:
- tmpdir = self.mkdtemp()
- base_dir, root_dir, base_name = self._create_files()
- tmpdir2 = self.mkdtemp()
+ expected = rlistdir(root_dir)
+ expected.remove('outer')
+ if format == 'zip':
+ expected.remove('dist/sub2/')
+ base_name = os.path.join(self.mkdtemp(), 'archive')
filename = make_archive(base_name, format, root_dir, base_dir)
# let's try to unpack it now
+ tmpdir2 = self.mkdtemp()
unpack_archive(filename, tmpdir2)
- diff = self._compare_dirs(tmpdir, tmpdir2)
- self.assertEqual(diff, [])
+ self.assertEqual(rlistdir(tmpdir2), expected)
# and again, this time with the format specified
tmpdir3 = self.mkdtemp()
unpack_archive(filename, tmpdir3, format=format)
- diff = self._compare_dirs(tmpdir, tmpdir3)
- self.assertEqual(diff, [])
+ self.assertEqual(rlistdir(tmpdir3), expected)
self.assertRaises(shutil.ReadError, unpack_archive, TESTFN)
self.assertRaises(ValueError, unpack_archive, TESTFN, format='xxx')