From: Martin v. Löwis Date: Tue, 6 Mar 2007 10:41:24 +0000 (+0000) Subject: Patch #1121142: Implement ZipFile.open. X-Git-Tag: v2.6a1~2109 X-Git-Url: https://granicus.if.org/sourcecode?a=commitdiff_plain;h=3eb764898676c3d8fd4df5c40f62607f3155037c;p=python Patch #1121142: Implement ZipFile.open. --- diff --git a/Doc/lib/libzipfile.tex b/Doc/lib/libzipfile.tex index 06a236cef6..bfe5966ff3 100644 --- a/Doc/lib/libzipfile.tex +++ b/Doc/lib/libzipfile.tex @@ -141,6 +141,32 @@ cat myzip.zip >> python.exe Return a list of archive members by name. \end{methoddesc} +\begin{methoddesc}{open}{name\optional{, mode\optional{, pwd}}} + Extract a member from the archive as a file-like object (ZipExtFile). + \var{name} is the name of the file in the archive. The \var{mode} + parameter, if included, must be one of the following: \code{'r'} (the + default), \code{'U'}, or \code{'rU'}. Choosing \code{'U'} or + \code{'rU'} will enable universal newline support in the read-only + object. \var{pwd} is the password used for encrypted files. + \begin{notice} + The file-like object is read-only and provides the following methods: + \method{read()}, \method{readline()}, \method{readlines()}, + \method{__iter__()}, \method{next()}. + \end{notice} + \begin{notice} + If the ZipFile was created by passing in a file-like object as the + first argument to the constructor, then the object returned by + \method{open()} shares the ZipFile's file pointer. Under these + circumstances, the object returned by \method{open()} should not + be used after any additional operations are performed on the + ZipFile object. If the ZipFile was created by passing in a string + (the filename) as the first argument to the constructor, then + \method{open()} will create a new file object that will be held + by the ZipExtFile, allowing it to operate independently of the + ZipFile. + \end{notice} +\end{methoddesc} + \begin{methoddesc}{printdir}{} Print a table of contents for the archive to \code{sys.stdout}. \end{methoddesc} diff --git a/Lib/test/test_zipfile.py b/Lib/test/test_zipfile.py index c17039f227..e0b255ce74 100644 --- a/Lib/test/test_zipfile.py +++ b/Lib/test/test_zipfile.py @@ -4,26 +4,29 @@ try: except ImportError: zlib = None -import zipfile, os, unittest, sys, shutil +import zipfile, os, unittest, sys, shutil, struct from StringIO import StringIO from tempfile import TemporaryFile +from random import randint, random from test.test_support import TESTFN, run_unittest TESTFN2 = TESTFN + "2" +FIXEDTEST_SIZE = 10 class TestsWithSourceFile(unittest.TestCase): def setUp(self): - line_gen = ("Test of zipfile line %d." % i for i in range(0, 1000)) - self.data = '\n'.join(line_gen) + self.line_gen = ("Zipfile test line %d. random float: %f" % (i, random()) + for i in xrange(FIXEDTEST_SIZE)) + self.data = '\n'.join(self.line_gen) + '\n' # Make a source file with some lines fp = open(TESTFN, "wb") fp.write(self.data) fp.close() - def zipTest(self, f, compression): + def makeTestArchive(self, f, compression): # Create the ZIP archive zipfp = zipfile.ZipFile(f, "w", compression) zipfp.write(TESTFN, "another"+os.extsep+"name") @@ -31,6 +34,9 @@ class TestsWithSourceFile(unittest.TestCase): zipfp.writestr("strfile", self.data) zipfp.close() + def zipTest(self, f, compression): + self.makeTestArchive(f, compression) + # Read the ZIP archive zipfp = zipfile.ZipFile(f, "r", compression) self.assertEqual(zipfp.read(TESTFN), self.data) @@ -85,22 +91,144 @@ class TestsWithSourceFile(unittest.TestCase): # Check that testzip doesn't raise an exception zipfp.testzip() + zipfp.close() + + def testStored(self): + for f in (TESTFN2, TemporaryFile(), StringIO()): + self.zipTest(f, zipfile.ZIP_STORED) + + def zipOpenTest(self, f, compression): + self.makeTestArchive(f, compression) + + # Read the ZIP archive + zipfp = zipfile.ZipFile(f, "r", compression) + zipdata1 = [] + zipopen1 = zipfp.open(TESTFN) + while 1: + read_data = zipopen1.read(256) + if not read_data: + break + zipdata1.append(read_data) + + zipdata2 = [] + zipopen2 = zipfp.open("another"+os.extsep+"name") + while 1: + read_data = zipopen2.read(256) + if not read_data: + break + zipdata2.append(read_data) + + self.assertEqual(''.join(zipdata1), self.data) + self.assertEqual(''.join(zipdata2), self.data) + zipfp.close() + + def testOpenStored(self): + for f in (TESTFN2, TemporaryFile(), StringIO()): + self.zipOpenTest(f, zipfile.ZIP_STORED) + + def zipRandomOpenTest(self, f, compression): + self.makeTestArchive(f, compression) + + # Read the ZIP archive + zipfp = zipfile.ZipFile(f, "r", compression) + zipdata1 = [] + zipopen1 = zipfp.open(TESTFN) + while 1: + read_data = zipopen1.read(randint(1, 1024)) + if not read_data: + break + zipdata1.append(read_data) + + self.assertEqual(''.join(zipdata1), self.data) + zipfp.close() + + def testRandomOpenStored(self): + for f in (TESTFN2, TemporaryFile(), StringIO()): + self.zipRandomOpenTest(f, zipfile.ZIP_STORED) + + def zipReadlineTest(self, f, compression): + self.makeTestArchive(f, compression) + # Read the ZIP archive + zipfp = zipfile.ZipFile(f, "r") + zipopen = zipfp.open(TESTFN) + for line in self.line_gen: + linedata = zipopen.readline() + self.assertEqual(linedata, line + '\n') zipfp.close() + def zipReadlinesTest(self, f, compression): + self.makeTestArchive(f, compression) + # Read the ZIP archive + zipfp = zipfile.ZipFile(f, "r") + ziplines = zipfp.open(TESTFN).readlines() + for line, zipline in zip(self.line_gen, ziplines): + self.assertEqual(zipline, line + '\n') + zipfp.close() - def testStored(self): + def zipIterlinesTest(self, f, compression): + self.makeTestArchive(f, compression) + + # Read the ZIP archive + zipfp = zipfile.ZipFile(f, "r") + for line, zipline in zip(self.line_gen, zipfp.open(TESTFN)): + self.assertEqual(zipline, line + '\n') + + zipfp.close() + + def testReadlineStored(self): for f in (TESTFN2, TemporaryFile(), StringIO()): - self.zipTest(f, zipfile.ZIP_STORED) + self.zipReadlineTest(f, zipfile.ZIP_STORED) + + def testReadlinesStored(self): + for f in (TESTFN2, TemporaryFile(), StringIO()): + self.zipReadlinesTest(f, zipfile.ZIP_STORED) + + def testIterlinesStored(self): + for f in (TESTFN2, TemporaryFile(), StringIO()): + self.zipIterlinesTest(f, zipfile.ZIP_STORED) if zlib: def testDeflated(self): for f in (TESTFN2, TemporaryFile(), StringIO()): self.zipTest(f, zipfile.ZIP_DEFLATED) + def testOpenDeflated(self): + for f in (TESTFN2, TemporaryFile(), StringIO()): + self.zipOpenTest(f, zipfile.ZIP_DEFLATED) + + def testRandomOpenDeflated(self): + for f in (TESTFN2, TemporaryFile(), StringIO()): + self.zipRandomOpenTest(f, zipfile.ZIP_DEFLATED) + + def testReadlineDeflated(self): + for f in (TESTFN2, TemporaryFile(), StringIO()): + self.zipReadlineTest(f, zipfile.ZIP_DEFLATED) + + def testReadlinesDeflated(self): + for f in (TESTFN2, TemporaryFile(), StringIO()): + self.zipReadlinesTest(f, zipfile.ZIP_DEFLATED) + + def testIterlinesDeflated(self): + for f in (TESTFN2, TemporaryFile(), StringIO()): + self.zipIterlinesTest(f, zipfile.ZIP_DEFLATED) + + def testLowCompression(self): + # Checks for cases where compressed data is larger than original + # Create the ZIP archive + zipfp = zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_DEFLATED) + zipfp.writestr("strfile", '12') + zipfp.close() + + # Get an open object for strfile + zipfp = zipfile.ZipFile(TESTFN2, "r", zipfile.ZIP_DEFLATED) + openobj = zipfp.open("strfile") + self.assertEqual(openobj.read(1), '1') + self.assertEqual(openobj.read(1), '2') + def testAbsoluteArcnames(self): zipfp = zipfile.ZipFile(TESTFN2, "w", zipfile.ZIP_STORED) zipfp.write(TESTFN, "/absolute") @@ -110,7 +238,6 @@ class TestsWithSourceFile(unittest.TestCase): self.assertEqual(zipfp.namelist(), ["absolute"]) zipfp.close() - def tearDown(self): os.remove(TESTFN) os.remove(TESTFN2) @@ -123,7 +250,7 @@ class TestZip64InSmallFiles(unittest.TestCase): self._limit = zipfile.ZIP64_LIMIT zipfile.ZIP64_LIMIT = 5 - line_gen = ("Test of zipfile line %d." % i for i in range(0, 1000)) + line_gen = ("Test of zipfile line %d." % i for i in range(0, FIXEDTEST_SIZE)) self.data = '\n'.join(line_gen) # Make a source file with some lines @@ -344,6 +471,26 @@ class OtherTests(unittest.TestCase): except zipfile.BadZipfile: os.unlink(TESTFN) + def testIsZipErroneousFile(self): + # This test checks that the is_zipfile function correctly identifies + # a file that is not a zip file + fp = open(TESTFN, "w") + fp.write("this is not a legal zip file\n") + fp.close() + chk = zipfile.is_zipfile(TESTFN) + os.unlink(TESTFN) + self.assert_(chk is False) + + def testIsZipValidFile(self): + # This test checks that the is_zipfile function correctly identifies + # a file that is a zip file + zipf = zipfile.ZipFile(TESTFN, mode="w") + zipf.writestr("foo.txt", "O, for a Muse of Fire!") + zipf.close() + chk = zipfile.is_zipfile(TESTFN) + os.unlink(TESTFN) + self.assert_(chk is True) + def testNonExistentFileRaisesIOError(self): # make sure we don't raise an AttributeError when a partially-constructed # ZipFile instance is finalized; this tests for regression on SF tracker @@ -371,7 +518,6 @@ class OtherTests(unittest.TestCase): # and report that the first file in the archive was corrupt. self.assertRaises(RuntimeError, zipf.testzip) - class DecryptionTests(unittest.TestCase): # This test checks that ZIP decryption works. Since the library does not # support encryption at the moment, we use a pre-generated encrypted @@ -411,9 +557,255 @@ class DecryptionTests(unittest.TestCase): self.zip.setpassword("python") self.assertEquals(self.zip.read("test.txt"), self.plain) + +class TestsWithRandomBinaryFiles(unittest.TestCase): + def setUp(self): + datacount = randint(16, 64)*1024 + randint(1, 1024) + self.data = ''.join((struct.pack('= 0: + nllen = len(sep) + return nl, nllen + + return nl, nllen + + def readline(self, size = -1): + """Read a line with approx. size. If size is negative, + read a whole line. + """ + if size < 0: + size = sys.maxint + elif size == 0: + return '' + + # check for a newline already in buffer + nl, nllen = self._checkfornewline() + + if nl >= 0: + # the next line was already in the buffer + nl = min(nl, size) + else: + # no line break in buffer - try to read more + size -= len(self.linebuffer) + while nl < 0 and size > 0: + buf = self.read(min(size, 100)) + if not buf: + break + self.linebuffer += buf + size -= len(buf) + + # check for a newline in buffer + nl, nllen = self._checkfornewline() + + # we either ran out of bytes in the file, or + # met the specified size limit without finding a newline, + # so return current buffer + if nl < 0: + s = self.linebuffer + self.linebuffer = '' + return s + + buf = self.linebuffer[:nl] + self.lastdiscard = self.linebuffer[nl:nl + nllen] + self.linebuffer = self.linebuffer[nl + nllen:] + + # line is always returned with \n as newline char (except possibly + # for a final incomplete line in the file, which is handled above). + return buf + "\n" + + def readlines(self, sizehint = -1): + """Return a list with all (following) lines. The sizehint parameter + is ignored in this implementation. + """ + result = [] + while True: + line = self.readline() + if not line: break + result.append(line) + return result + + def read(self, size = None): + # act like file() obj and return empty string if size is 0 + if size == 0: + return '' + + # determine read size + bytesToRead = self.compress_size - self.bytes_read + + # adjust read size for encrypted files since the first 12 bytes + # are for the encryption/password information + if self.decrypter is not None: + bytesToRead -= 12 + + if size is not None and size >= 0: + if self.compress_type == ZIP_STORED: + lr = len(self.readbuffer) + bytesToRead = min(bytesToRead, size - lr) + elif self.compress_type == ZIP_DEFLATED: + if len(self.readbuffer) > size: + # the user has requested fewer bytes than we've already + # pulled through the decompressor; don't read any more + bytesToRead = 0 + else: + # user will use up the buffer, so read some more + lr = len(self.rawbuffer) + bytesToRead = min(bytesToRead, self.compreadsize - lr) + + # avoid reading past end of file contents + if bytesToRead + self.bytes_read > self.compress_size: + bytesToRead = self.compress_size - self.bytes_read + + # try to read from file (if necessary) + if bytesToRead > 0: + bytes = self.fileobj.read(bytesToRead) + self.bytes_read += len(bytes) + self.rawbuffer += bytes + + # handle contents of raw buffer + if self.rawbuffer: + newdata = self.rawbuffer + self.rawbuffer = '' + + # decrypt new data if we were given an object to handle that + if newdata and self.decrypter is not None: + newdata = ''.join(map(self.decrypter, newdata)) + + # decompress newly read data if necessary + if newdata and self.compress_type == ZIP_DEFLATED: + newdata = self.dc.decompress(newdata) + self.rawbuffer = self.dc.unconsumed_tail + if self.eof and len(self.rawbuffer) == 0: + # we're out of raw bytes (both from the file and + # the local buffer); flush just to make sure the + # decompressor is done + newdata += self.dc.flush() + # prevent decompressor from being used again + self.dc = None + + self.readbuffer += newdata + + + # return what the user asked for + if size is None or len(self.readbuffer) <= size: + bytes = self.readbuffer + self.readbuffer = '' + else: + bytes = self.readbuffer[:size] + self.readbuffer = self.readbuffer[size:] + + return bytes + + class ZipFile: """ Class with methods to open, read, write, close, list zip files. @@ -534,73 +728,75 @@ class ZipFile: def read(self, name, pwd=None): """Return file bytes (as a string) for name.""" - if self.mode not in ("r", "a"): - raise RuntimeError, 'read() requires mode "r" or "a"' + return self.open(name, "r", pwd).read() + + def open(self, name, mode="r", pwd=None): + """Return file-like object for 'name'.""" + if mode not in ("r", "U", "rU"): + raise RuntimeError, 'open() requires mode "r", "U", or "rU"' if not self.fp: raise RuntimeError, \ "Attempt to read ZIP archive that was already closed" + + # Only open a new file for instances where we were not + # given a file object in the constructor + if self._filePassed: + zef_file = self.fp + else: + zef_file = open(self.filename, 'rb') + + # Get info object for name zinfo = self.getinfo(name) - is_encrypted = zinfo.flag_bits & 0x1 - if is_encrypted: - if not pwd: - pwd = self.pwd - if not pwd: - raise RuntimeError, "File %s is encrypted, " \ - "password required for extraction" % name - filepos = self.fp.tell() - self.fp.seek(zinfo.header_offset, 0) + filepos = zef_file.tell() + + zef_file.seek(zinfo.header_offset, 0) # Skip the file header: - fheader = self.fp.read(30) + fheader = zef_file.read(30) if fheader[0:4] != stringFileHeader: raise BadZipfile, "Bad magic number for file header" fheader = struct.unpack(structFileHeader, fheader) - fname = self.fp.read(fheader[_FH_FILENAME_LENGTH]) + fname = zef_file.read(fheader[_FH_FILENAME_LENGTH]) if fheader[_FH_EXTRA_FIELD_LENGTH]: - self.fp.read(fheader[_FH_EXTRA_FIELD_LENGTH]) + zef_file.read(fheader[_FH_EXTRA_FIELD_LENGTH]) if fname != zinfo.orig_filename: raise BadZipfile, \ 'File name in directory "%s" and header "%s" differ.' % ( zinfo.orig_filename, fname) - bytes = self.fp.read(zinfo.compress_size) - # Go with decryption + # check for encrypted flag & handle password + is_encrypted = zinfo.flag_bits & 0x1 + zd = None if is_encrypted: + if not pwd: + pwd = self.pwd + if not pwd: + raise RuntimeError, "File %s is encrypted, " \ + "password required for extraction" % name + zd = _ZipDecrypter(pwd) # The first 12 bytes in the cypher stream is an encryption header # used to strengthen the algorithm. The first 11 bytes are # completely random, while the 12th contains the MSB of the CRC, # and is used to check the correctness of the password. + bytes = zef_file.read(12) h = map(zd, bytes[0:12]) if ord(h[11]) != ((zinfo.CRC>>24)&255): raise RuntimeError, "Bad password for file %s" % name - bytes = "".join(map(zd, bytes[12:])) - # Go with decompression - self.fp.seek(filepos, 0) - if zinfo.compress_type == ZIP_STORED: - pass - elif zinfo.compress_type == ZIP_DEFLATED: - if not zlib: - raise RuntimeError, \ - "De-compression requires the (missing) zlib module" - # zlib compress/decompress code by Jeremy Hylton of CNRI - dc = zlib.decompressobj(-15) - bytes = dc.decompress(bytes) - # need to feed in unused pad byte so that zlib won't choke - ex = dc.decompress('Z') + dc.flush() - if ex: - bytes = bytes + ex + + # build and return a ZipExtFile + if zd is None: + zef = ZipExtFile(zef_file, zinfo) else: - raise BadZipfile, \ - "Unsupported compression method %d for file %s" % \ - (zinfo.compress_type, name) - crc = binascii.crc32(bytes) - if crc != zinfo.CRC: - raise BadZipfile, "Bad CRC-32 for file %s" % name - return bytes + zef = ZipExtFile(zef_file, zinfo, zd) + + # set universal newlines on ZipExtFile if necessary + if "U" in mode: + zef.set_univ_newlines(True) + return zef def _writecheck(self, zinfo): """Check for errors before writing a file to the archive.""" diff --git a/Misc/NEWS b/Misc/NEWS index b0bc88930f..894f1cbe50 100644 --- a/Misc/NEWS +++ b/Misc/NEWS @@ -139,6 +139,8 @@ Core and builtins Library ------- +- Patch #1121142: Implement ZipFile.open. + - Taught setup.py how to locate Berkeley DB on Macs using MacPorts. - Added heapq.merge() for merging sorted input streams.