]> granicus.if.org Git - python/commitdiff
Test zipimporter a bit more. Also get working with -R :: option for finding ref...
authorNeal Norwitz <nnorwitz@gmail.com>
Mon, 23 Jan 2006 07:52:13 +0000 (07:52 +0000)
committerNeal Norwitz <nnorwitz@gmail.com>
Mon, 23 Jan 2006 07:52:13 +0000 (07:52 +0000)
Lib/test/test_zipimport.py

index 21bde6830ecc0ac796a41d99a831df8b9d81df0d..bd48c73842dfcbaf54f73220d9a0b16089485add 100644 (file)
@@ -4,6 +4,7 @@ import marshal
 import imp
 import struct
 import time
+import unittest
 
 import zlib # implied prerequisite
 from zipfile import ZipFile, ZipInfo, ZIP_STORED, ZIP_DEFLATED
@@ -13,6 +14,11 @@ from test.test_importhooks import ImportHooksBaseTestCase, test_src, test_co
 import zipimport
 
 
+# so we only run testAFakeZlib once if this test is run repeatedly
+# which happens when we look for ref leaks
+test_imported = False
+
+
 def make_pyc(co, mtime):
     data = marshal.dumps(co)
     if type(mtime) is type(0.0):
@@ -176,6 +182,37 @@ class UncompressedZipImportTestCase(ImportHooksBaseTestCase):
                  packdir2 + TESTMOD + pyc_ext: (NOW, test_pyc)}
         self.doTest(pyc_ext, files, TESTPACK, TESTPACK2, TESTMOD)
 
+    def testZipImporterMethods(self):
+        packdir = TESTPACK + os.sep
+        packdir2 = packdir + TESTPACK2 + os.sep
+        files = {packdir + "__init__" + pyc_ext: (NOW, test_pyc),
+                 packdir2 + "__init__" + pyc_ext: (NOW, test_pyc),
+                 packdir2 + TESTMOD + pyc_ext: (NOW, test_pyc)}
+
+        z = ZipFile(TEMP_ZIP, "w")
+        try:
+            for name, (mtime, data) in files.items():
+                zinfo = ZipInfo(name, time.localtime(mtime))
+                zinfo.compress_type = self.compression
+                z.writestr(zinfo, data)
+            z.close()
+
+            zi = zipimport.zipimporter(TEMP_ZIP)
+            self.assertEquals(zi.is_package(TESTPACK), True)
+            zi.load_module(TESTPACK)
+            
+            self.assertEquals(zi.is_package(packdir + '__init__'), False)
+            self.assertEquals(zi.is_package(packdir + TESTPACK2), True)
+            self.assertEquals(zi.is_package(packdir2 + TESTMOD), False)
+
+            mod_name = packdir2 + TESTMOD
+            mod = __import__(mod_name.replace('/', '.'))
+            self.assertEquals(zi.get_source(TESTPACK), None)
+            self.assertEquals(zi.get_source(mod_name), None)
+        finally:
+            z.close()
+            os.remove(TEMP_ZIP)
+
     def testGetData(self):
         z = ZipFile(TEMP_ZIP, "w")
         z.compression = self.compression
@@ -186,6 +223,7 @@ class UncompressedZipImportTestCase(ImportHooksBaseTestCase):
             z.close()
             zi = zipimport.zipimporter(TEMP_ZIP)
             self.assertEquals(data, zi.get_data(name))
+            self.assert_('zipimporter object' in repr(zi))
         finally:
             z.close()
             os.remove(TEMP_ZIP)
@@ -212,11 +250,91 @@ class CompressedZipImportTestCase(UncompressedZipImportTestCase):
     compression = ZIP_DEFLATED
 
 
+class BadFileZipImportTestCase(unittest.TestCase):
+    def assertZipFailure(self, filename):
+        self.assertRaises(zipimport.ZipImportError,
+                          zipimport.zipimporter, filename)
+
+    def testNoFile(self):
+        self.assertZipFailure('AdfjdkFJKDFJjdklfjs')
+
+    def testEmptyFilename(self):
+        self.assertZipFailure('')
+
+    def testBadArgs(self):
+        self.assertRaises(TypeError, zipimport.zipimporter, None)
+        self.assertRaises(TypeError, zipimport.zipimporter, TESTMOD, kwd=None)
+
+    def testFilenameTooLong(self):
+        self.assertZipFailure('A' * 33000)
+
+    def testEmptyFile(self):
+        test_support.unlink(TESTMOD)
+        open(TESTMOD, 'w+').close()
+        self.assertZipFailure(TESTMOD)
+
+    def testFileUnreadable(self):
+        test_support.unlink(TESTMOD)
+        fd = os.open(TESTMOD, os.O_CREAT, 000)
+        os.close(fd)
+        self.assertZipFailure(TESTMOD)
+
+    def testNotZipFile(self):
+        test_support.unlink(TESTMOD)
+        fp = open(TESTMOD, 'w+')
+        fp.write('a' * 22)
+        fp.close()
+        self.assertZipFailure(TESTMOD)
+
+    def testBogusZipFile(self):
+        test_support.unlink(TESTMOD)
+        fp = open(TESTMOD, 'w+')
+        fp.write(struct.pack('=I', 0x06054B50))
+        fp.write('a' * 18)
+        fp.close()
+        z = zipimport.zipimporter(TESTMOD)
+
+        try:
+            self.assertRaises(TypeError, z.find_module, None)
+            self.assertRaises(TypeError, z.load_module, None)
+            self.assertRaises(TypeError, z.is_package, None)
+            self.assertRaises(TypeError, z.get_code, None)
+            self.assertRaises(TypeError, z.get_data, None)
+            self.assertRaises(TypeError, z.get_source, None)
+
+            error = zipimport.ZipImportError
+            self.assertEqual(z.find_module('abc'), None)
+
+            self.assertRaises(error, z.load_module, 'abc')
+            self.assertRaises(error, z.get_code, 'abc')
+            self.assertRaises(IOError, z.get_data, 'abc')
+            self.assertRaises(error, z.get_source, 'abc')
+            self.assertRaises(error, z.is_package, 'abc')
+        finally:
+            zipimport._zip_directory_cache.clear()
+
+
+def cleanup():
+    # this is necessary if test is run repeated (like when finding leaks)
+    global test_imported
+    if test_imported:
+        zipimport._zip_directory_cache.clear()
+        if hasattr(UncompressedZipImportTestCase, 'testAFakeZlib'):
+            delattr(UncompressedZipImportTestCase, 'testAFakeZlib')
+        if hasattr(CompressedZipImportTestCase, 'testAFakeZlib'):
+            delattr(CompressedZipImportTestCase, 'testAFakeZlib')
+    test_imported = True
+
 def test_main():
-    test_support.run_unittest(
-        UncompressedZipImportTestCase,
-        CompressedZipImportTestCase
-    )
+    cleanup()
+    try:
+        test_support.run_unittest(
+              UncompressedZipImportTestCase,
+              CompressedZipImportTestCase,
+              BadFileZipImportTestCase,
+            )
+    finally:
+        test_support.unlink(TESTMOD)
 
 if __name__ == "__main__":
     test_main()