signed_chksum = 256 + sum(struct.unpack_from("148b8x356b", buf))
return unsigned_chksum, signed_chksum
-def copyfileobj(src, dst, length=None):
+def copyfileobj(src, dst, length=None, exception=OSError):
"""Copy length bytes from fileobj src to fileobj dst.
If length is None, copy the entire content.
"""
for b in range(blocks):
buf = src.read(BUFSIZE)
if len(buf) < BUFSIZE:
- raise OSError("end of file reached")
+ raise exception("unexpected end of data")
dst.write(buf)
if remainder != 0:
buf = src.read(remainder)
if len(buf) < remainder:
- raise OSError("end of file reached")
+ raise exception("unexpected end of data")
dst.write(buf)
return
length = min(size, stop - self.position)
if data:
self.fileobj.seek(offset + (self.position - start))
- buf += self.fileobj.read(length)
+ b = self.fileobj.read(length)
+ if len(b) != length:
+ raise ReadError("unexpected end of data")
+ buf += b
else:
buf += NUL * length
size -= length
if tarinfo.sparse is not None:
for offset, size in tarinfo.sparse:
target.seek(offset)
- copyfileobj(source, target, size)
+ copyfileobj(source, target, size, ReadError)
else:
- copyfileobj(source, target, tarinfo.size)
+ copyfileobj(source, target, tarinfo.size, ReadError)
target.seek(tarinfo.size)
target.truncate()
self.firstmember = None
return m
+ # Advance the file pointer.
+ if self.offset != self.fileobj.tell():
+ self.fileobj.seek(self.offset - 1)
+ if not self.fileobj.read(1):
+ raise ReadError("unexpected end of data")
+
# Read the next block.
- self.fileobj.seek(self.offset)
tarinfo = None
while True:
try:
finally:
tar.close()
+ def test_premature_end_of_archive(self):
+ for size in (512, 600, 1024, 1200):
+ with tarfile.open(tmpname, "w:") as tar:
+ t = tarfile.TarInfo("foo")
+ t.size = 1024
+ tar.addfile(t, io.BytesIO(b"a" * 1024))
+
+ with open(tmpname, "r+b") as fobj:
+ fobj.truncate(size)
+
+ with tarfile.open(tmpname) as tar:
+ with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"):
+ for t in tar:
+ pass
+
+ with tarfile.open(tmpname) as tar:
+ t = tar.next()
+
+ with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"):
+ tar.extract(t, TEMPDIR)
+
+ with self.assertRaisesRegex(tarfile.ReadError, "unexpected end of data"):
+ tar.extractfile(t).read()
class MiscReadTestBase(CommonReadTest):
def requires_name_attribute(self):