mode = self.mode + self.sep + "*"
self.tar = tarfile.open(tarname(self.comp), mode)
+class ReadFileobjTest(BaseTest):
+
+ def test_fileobj_with_offset(self):
+ # Skip the first member and store values from the second member
+ # of the testtar.
+ self.tar.next()
+ t = self.tar.next()
+ name = t.name
+ offset = t.offset
+ data = self.tar.extractfile(t).read()
+ self.tar.close()
+
+ # Open the testtar and seek to the offset of the second member.
+ if self.comp == "gz":
+ _open = gzip.GzipFile
+ elif self.comp == "bz2":
+ _open = bz2.BZ2File
+ else:
+ _open = open
+ fobj = _open(tarname(self.comp), "rb")
+ fobj.seek(offset)
+
+ # Test if the tarfile starts with the second member.
+ self.tar = tarfile.open(tarname(self.comp), "r:", fileobj=fobj)
+ t = self.tar.next()
+ self.assertEqual(t.name, name)
+ # Read to the end of fileobj and test if seeking back to the
+ # beginning works.
+ self.tar.getmembers()
+ self.assertEqual(self.tar.extractfile(t).read(), data,
+ "seek back did not work")
+
class WriteTest(BaseTest):
mode = 'w'
comp = "gz"
class ReadStreamAsteriskTestGzip(ReadStreamAsteriskTest):
comp = "gz"
+class ReadFileobjTestGzip(ReadFileobjTest):
+ comp = "gz"
# Filemode test cases
comp = "bz2"
class ReadStreamAsteriskTestBzip2(ReadStreamAsteriskTest):
comp = "bz2"
+ class ReadFileobjTestBzip2(ReadFileobjTest):
+ comp = "bz2"
# If importing gzip failed, discard the Gzip TestCases.
if not gzip:
ReadDetectFileobjTest,
ReadAsteriskTest,
ReadStreamAsteriskTest,
+ ReadFileobjTest,
WriteTest,
Write100Test,
WriteSize0Test,
ReadTestGzip, ReadStreamTestGzip,
WriteTestGzip, WriteStreamTestGzip,
ReadDetectTestGzip, ReadDetectFileobjTestGzip,
- ReadAsteriskTestGzip, ReadStreamAsteriskTestGzip
+ ReadAsteriskTestGzip, ReadStreamAsteriskTestGzip,
+ ReadFileobjTestGzip
])
if bz2:
ReadTestBzip2, ReadStreamTestBzip2,
WriteTestBzip2, WriteStreamTestBzip2,
ReadDetectTestBzip2, ReadDetectFileobjTestBzip2,
- ReadAsteriskTestBzip2, ReadStreamAsteriskTestBzip2
+ ReadAsteriskTestBzip2, ReadStreamAsteriskTestBzip2,
+ ReadFileobjTestBzip2
])
try:
test_support.run_unittest(*tests)