try:
n = int(nts(s, "ascii", "strict") or "0", 8)
except ValueError:
- raise HeaderError("invalid header")
+ raise InvalidHeaderError("invalid header")
else:
n = 0
for i in range(len(s) - 1):
"""Exception for unsupported operations on stream-like TarFiles."""
pass
class HeaderError(TarError):
+ """Base exception for header errors."""
+ pass
+class EmptyHeaderError(HeaderError):
+ """Exception for empty headers."""
+ pass
+class TruncatedHeaderError(HeaderError):
+ """Exception for truncated headers."""
+ pass
+class EOFHeaderError(HeaderError):
+ """Exception for end of file headers."""
+ pass
+class InvalidHeaderError(HeaderError):
"""Exception for invalid headers."""
pass
+class SubsequentHeaderError(HeaderError):
+ """Exception for missing and invalid extended headers."""
+ pass
#---------------------------
# internal stream interface
def frombuf(cls, buf, encoding, errors):
"""Construct a TarInfo object from a 512 byte bytes object.
"""
+ if len(buf) == 0:
+ raise EmptyHeaderError("empty header")
if len(buf) != BLOCKSIZE:
- raise HeaderError("truncated header")
+ raise TruncatedHeaderError("truncated header")
if buf.count(NUL) == BLOCKSIZE:
- raise HeaderError("empty header")
+ raise EOFHeaderError("end of file header")
chksum = nti(buf[148:156])
if chksum not in calc_chksums(buf):
- raise HeaderError("bad checksum")
+ raise InvalidHeaderError("bad checksum")
obj = cls()
obj.name = nts(buf[0:100], encoding, errors)
tarfile.
"""
buf = tarfile.fileobj.read(BLOCKSIZE)
- if not buf:
- return
obj = cls.frombuf(buf, tarfile.encoding, tarfile.errors)
obj.offset = tarfile.fileobj.tell() - BLOCKSIZE
return obj._proc_member(tarfile)
buf = tarfile.fileobj.read(self._block(self.size))
# Fetch the next header and process it.
- next = self.fromtarfile(tarfile)
- if next is None:
- raise HeaderError("missing subsequent header")
+ try:
+ next = self.fromtarfile(tarfile)
+ except HeaderError:
+ raise SubsequentHeaderError("missing or bad subsequent header")
# Patch the TarInfo object from the next header with
# the longname information.
pos += length
# Fetch the next header.
- next = self.fromtarfile(tarfile)
+ try:
+ next = self.fromtarfile(tarfile)
+ except HeaderError:
+ raise SubsequentHeaderError("missing or bad subsequent header")
if self.type in (XHDTYPE, SOLARIS_XHDTYPE):
- if next is None:
- raise HeaderError("missing subsequent header")
-
# Patch the TarInfo object with the extended header info.
next._apply_pax_info(pax_headers, tarfile.encoding, tarfile.errors)
next.offset = self.offset
if self.mode == "a":
# Move to the end of the archive,
# before the first empty block.
- self.firstmember = None
while True:
- if self.next() is None:
- if self.offset > 0:
- self.fileobj.seek(self.fileobj.tell() - BLOCKSIZE)
+ self.fileobj.seek(self.offset)
+ try:
+ tarinfo = self.tarinfo.fromtarfile(self)
+ self.members.append(tarinfo)
+ except EOFHeaderError:
+ self.fileobj.seek(self.offset)
break
+ except HeaderError as e:
+ raise ReadError(str(e))
if self.mode in "aw":
self._loaded = True
try:
t = cls.taropen(name, mode, fileobj, **kwargs)
- except IOError:
+ except (IOError, EOFError):
raise ReadError("not a bzip2 file")
t._extfileobj = False
return t
# Read the next block.
self.fileobj.seek(self.offset)
+ tarinfo = None
while True:
try:
tarinfo = self.tarinfo.fromtarfile(self)
- if tarinfo is None:
- return
- self.members.append(tarinfo)
-
- except HeaderError as e:
+ except EOFHeaderError as e:
if self.ignore_zeros:
self._dbg(2, "0x%X: %s" % (self.offset, e))
self.offset += BLOCKSIZE
continue
- else:
- if self.offset == 0:
- raise ReadError(str(e))
- return None
+ except InvalidHeaderError as e:
+ if self.ignore_zeros:
+ self._dbg(2, "0x%X: %s" % (self.offset, e))
+ self.offset += BLOCKSIZE
+ continue
+ elif self.offset == 0:
+ raise ReadError(str(e))
+ except EmptyHeaderError:
+ if self.offset == 0:
+ raise ReadError("empty file")
+ except TruncatedHeaderError as e:
+ if self.offset == 0:
+ raise ReadError(str(e))
+ except SubsequentHeaderError as e:
+ raise ReadError(str(e))
break
+ if tarinfo is not None:
+ self.members.append(tarinfo)
+ else:
+ self._loaded = True
+
return tarinfo
#--------------------------------------------------------------------------
fobj.close()
-class MiscReadTest(ReadTest):
+class CommonReadTest(ReadTest):
+
+ def test_empty_tarfile(self):
+ # Test for issue6123: Allow opening empty archives.
+ # This test checks if tarfile.open() is able to open an empty tar
+ # archive successfully. Note that an empty tar archive is not the
+ # same as an empty file!
+ tarfile.open(tmpname, self.mode.replace("r", "w")).close()
+ try:
+ tar = tarfile.open(tmpname, self.mode)
+ tar.getnames()
+ except tarfile.ReadError:
+ self.fail("tarfile.open() failed on empty archive")
+ self.assertListEqual(tar.getmembers(), [])
+
+ def test_null_tarfile(self):
+ # Test for issue6123: Allow opening empty archives.
+ # This test guarantees that tarfile.open() does not treat an empty
+ # file as an empty tar archive.
+ open(tmpname, "wb").close()
+ self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, self.mode)
+ self.assertRaises(tarfile.ReadError, tarfile.open, tmpname)
+
+ def test_ignore_zeros(self):
+ # Test TarFile's ignore_zeros option.
+ if self.mode.endswith(":gz"):
+ _open = gzip.GzipFile
+ elif self.mode.endswith(":bz2"):
+ _open = bz2.BZ2File
+ else:
+ _open = open
+
+ for char in (b'\0', b'a'):
+ # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a')
+ # are ignored correctly.
+ fobj = _open(tmpname, "wb")
+ fobj.write(char * 1024)
+ fobj.write(tarfile.TarInfo("foo").tobuf())
+ fobj.close()
+
+ tar = tarfile.open(tmpname, mode="r", ignore_zeros=True)
+ self.assertListEqual(tar.getnames(), ["foo"],
+ "ignore_zeros=True should have skipped the %r-blocks" % char)
+ tar.close()
+
+
+class MiscReadTest(CommonReadTest):
def test_no_name_argument(self):
fobj = open(self.tarname, "rb")
tar.close()
-class StreamReadTest(ReadTest):
+class StreamReadTest(CommonReadTest):
mode="r|"
self._test()
def test_empty(self):
- open(self.tarname, "w").close()
+ tarfile.open(self.tarname, "w:").close()
self._add_testfile()
self._test()
def test_empty_fileobj(self):
- fobj = io.BytesIO()
+ fobj = io.BytesIO(b"\0" * 1024)
self._add_testfile(fobj)
fobj.seek(0)
self._test(fileobj=fobj)
self._create_testtar("w:bz2")
self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a")
+ # Append mode is supposed to fail if the tarfile to append to
+ # does not end with a zero block.
+ def _test_error(self, data):
+ open(self.tarname, "wb").write(data)
+ self.assertRaises(tarfile.ReadError, self._add_testfile)
+
+ def test_null(self):
+ self._test_error(b"")
+
+ def test_incomplete(self):
+ self._test_error(b"\0" * 13)
+
+ def test_premature_eof(self):
+ data = tarfile.TarInfo("foo").tobuf()
+ self._test_error(data)
+
+ def test_trailing_garbage(self):
+ data = tarfile.TarInfo("foo").tobuf()
+ self._test_error(data + b"\0" * 13)
+
+ def test_invalid(self):
+ self._test_error(b"a" * 512)
+
class LimitsTest(unittest.TestCase):
raise AssertionError("infinite loop detected in tarfile.open()")
self.hit_eof = self.tell() == len(self.getvalue())
return super(MyBytesIO, self).read(n)
+ def seek(self, *args):
+ self.hit_eof = False
+ return super(MyBytesIO, self).seek(*args)
data = bz2.compress(tarfile.TarInfo("foo").tobuf())
for x in range(len(data) + 1):
- tarfile.open(fileobj=MyBytesIO(data[:x]), mode=mode)
+ try:
+ tarfile.open(fileobj=MyBytesIO(data[:x]), mode=mode)
+ except tarfile.ReadError:
+ pass # we have no interest in ReadErrors
def test_partial_input(self):
self._test_partial_input("r")