def setUp(self):
zipimport._zip_directory_cache.clear()
zipimport._zip_stat_cache.clear()
+ # save sys.modules so we can unimport everything done by our tests.
+ self._sys_modules_orig = dict(sys.modules)
ImportHooksBaseTestCase.setUp(self)
def tearDown(self):
ImportHooksBaseTestCase.tearDown(self)
+ # The closest we can come to un-importing our zipped up test modules.
+ sys.modules.clear()
+ sys.modules.update(self._sys_modules_orig)
if os.path.exists(TEMP_ZIP):
os.remove(TEMP_ZIP)
- def testZipFileChangesAfterFirstImport(self):
- """Alter the zip file after caching its index and try an import."""
+ def setUpZipFileModuleAndTestImports(self):
+ # Create a .zip file to test with
+ self.zipfile_path = TEMP_ZIP
packdir = TESTPACK + os.sep
files = {packdir + "__init__" + pyc_ext: (NOW, test_pyc),
packdir + TESTMOD + ".py": (NOW, "test_value = 38\n"),
"ziptest_a.py": (NOW, "test_value = 23\n"),
"ziptest_b.py": (NOW, "test_value = 42\n"),
"ziptest_c.py": (NOW, "test_value = 1337\n")}
- zipfile_path = TEMP_ZIP
- _write_zip_package(zipfile_path, files)
- self.assertTrue(os.path.exists(zipfile_path))
- sys.path.insert(0, zipfile_path)
+ _write_zip_package(self.zipfile_path, files)
+ self.assertTrue(os.path.exists(self.zipfile_path))
+ sys.path.insert(0, self.zipfile_path)
+
+ self.testpack_testmod = TESTPACK + "." + TESTMOD
+
+ with io.open(self.zipfile_path, "rb") as orig_zip_file:
+ self.orig_zip_file_contents = orig_zip_file.read()
# Import something out of the zipfile and confirm it is correct.
- testmod = __import__(TESTPACK + "." + TESTMOD,
+ testmod = __import__(self.testpack_testmod,
globals(), locals(), ["__dummy__"])
self.assertEqual(testmod.test_value, 38)
+ del sys.modules[TESTPACK]
+ del sys.modules[self.testpack_testmod]
+
# Import something else out of the zipfile and confirm it is correct.
ziptest_b = __import__("ziptest_b", globals(), locals(), ["test_value"])
self.assertEqual(ziptest_b.test_value, 42)
+ del sys.modules["ziptest_b"]
- # Truncate and fill the zip file with non-zip garbage.
- with io.open(zipfile_path, "rb") as orig_zip_file:
- orig_zip_file_contents = orig_zip_file.read()
- with io.open(zipfile_path, "wb") as byebye_valid_zip_file:
+ def truncateAndFillZipWithNonZipGarbage(self):
+ with io.open(self.zipfile_path, "wb") as byebye_valid_zip_file:
byebye_valid_zip_file.write(b"Tear down this wall!\n"*1987)
+
+ def restoreZipFileWithDifferentHeaderOffsets(self):
+ """Make it a valid zipfile with some garbage at the start."""
+ # This alters all of the caches offsets within the file.
+ with io.open(self.zipfile_path, "wb") as new_zip_file:
+ new_zip_file.write(b"X"*1991) # The year Python was created.
+ new_zip_file.write(self.orig_zip_file_contents)
+
+ def testZipFileChangesAfterFirstImport(self):
+ """Alter the zip file after caching its index and try an import."""
+ self.setUpZipFileModuleAndTestImports()
+ # The above call cached the .zip table of contents during its tests.
+ self.truncateAndFillZipWithNonZipGarbage()
# Now that the zipfile has been replaced, import something else from it
# which should fail as the file contents are now garbage.
with self.assertRaises(ImportError):
ziptest_a = __import__("ziptest_a", globals(), locals(),
["test_value"])
+ # The code path used by the __import__ call is different than
+ # that used by import statements. Try these as well. Some of
+ # these may create new zipimporter instances. We need to
+ # function properly using the global zipimport caches
+ # regardless of how many zipimporter instances for the same
+ # .zip file exist.
+ with self.assertRaises(ImportError):
+ import ziptest_a
+ with self.assertRaises(ImportError):
+ from ziptest_a import test_value
+ with self.assertRaises(ImportError):
+ exec("from {} import {}".format(TESTPACK, TESTMOD))
- # Now lets make it a valid zipfile that has some garbage at the start.
- # This alters all of the offsets within the file
- with io.open(zipfile_path, "wb") as new_zip_file:
- new_zip_file.write(b"X"*1991) # The year Python was created.
- new_zip_file.write(orig_zip_file_contents)
+ # Alters all of the offsets within the file
+ self.restoreZipFileWithDifferentHeaderOffsets()
# Now that the zip file has been "restored" to a valid but different
- # zipfile the zipimporter should *successfully* re-read the new zip
- # file's end of file central index and be able to import from it again.
+ # zipfile all zipimporter instances should *successfully* re-read the
+ # new file's end of file central index and be able to import again.
+
+ # Importing a submodule triggers a different import code path.
+ exec("import " + self.testpack_testmod)
+ self.assertEqual(getattr(locals()[TESTPACK], TESTMOD).test_value, 38)
+ exec("from {} import {}".format(TESTPACK, TESTMOD))
+ self.assertEqual(locals()[TESTMOD].test_value, 38)
+
ziptest_a = __import__("ziptest_a", globals(), locals(), ["test_value"])
self.assertEqual(ziptest_a.test_value, 23)
ziptest_c = __import__("ziptest_c", globals(), locals(), ["test_value"])
self.assertEqual(ziptest_c.test_value, 1337)
+ def testZipFileSubpackageImport(self):
+ """Import via multiple sys.path entries into parts of the zip."""
+ self.setUpZipFileModuleAndTestImports()
+ # Put a subdirectory within the zip file into the import path.
+ sys.path.insert(0, self.zipfile_path + os.sep + TESTPACK)
+
+ testmod = __import__(TESTMOD, globals(), locals(), ["test_value"])
+ self.assertEqual(testmod.test_value, 38)
+ del sys.modules[TESTMOD]
+ exec("from {} import test_value".format(TESTMOD))
+ self.assertEqual(test_value, 38)
+ del sys.modules[TESTMOD]
+
+ # Confirm that imports from the top level of the zip file
+ # (already in sys.path from the setup call above) still work.
+ ziptest_a = __import__("ziptest_a", globals(), locals(), ["test_value"])
+ self.assertEqual(ziptest_a.test_value, 23)
+ del sys.modules["ziptest_a"]
+ import ziptest_c
+ self.assertEqual(ziptest_c.test_value, 1337)
+ del sys.modules["ziptest_c"]
+
+ self.truncateAndFillZipWithNonZipGarbage()
+ # Imports should now fail.
+ with self.assertRaises(ImportError):
+ testmod = __import__(TESTMOD, globals(), locals(), ["test_value"])
+ with self.assertRaises(ImportError):
+ exec("from {} import test_value".format(TESTMOD))
+ with self.assertRaises(ImportError):
+ import ziptest_a
+
+ self.restoreZipFileWithDifferentHeaderOffsets()
+ # Imports should work again, the central directory TOC will be re-read.
+ testmod = __import__(TESTMOD, globals(), locals(), ["test_value"])
+ self.assertEqual(testmod.test_value, 38)
+ del sys.modules[TESTMOD]
+ exec("from {} import test_value".format(TESTMOD))
+ self.assertEqual(test_value, 38)
+
+ ziptest_a = __import__("ziptest_a", globals(), locals(), ["test_value"])
+ self.assertEqual(ziptest_a.test_value, 23)
+ import ziptest_c
+ self.assertEqual(ziptest_c.test_value, 1337)
+
class BadFileZipImportTestCase(unittest.TestCase):
def assertZipFailure(self, filename):
static int
zipimporter_init(ZipImporter *self, PyObject *args, PyObject *kwds)
{
- char *path, *p, *prefix, buf[MAXPATHLEN+2];
+ char *path_arg, *path, *p, *prefix, *path_buf;
size_t len;
if (!_PyArg_NoKeywords("zipimporter()", kwds))
return -1;
- if (!PyArg_ParseTuple(args, "s:zipimporter",
- &path))
+ if (!PyArg_ParseTuple(args, "s:zipimporter", &path_arg))
return -1;
- len = strlen(path);
+ len = strlen(path_arg);
if (len == 0) {
PyErr_SetString(ZipImportError, "archive path is empty");
return -1;
}
if (len >= MAXPATHLEN) {
- PyErr_SetString(ZipImportError,
- "archive path too long");
+ PyErr_SetString(ZipImportError, "archive path too long");
return -1;
}
- strcpy(buf, path);
+ /* Room for the trailing \0 and room for an extra SEP if needed. */
+ path_buf = (char *)PyMem_Malloc(len + 2);
+ if (path_buf == NULL) {
+ PyErr_SetString(PyExc_MemoryError, "unable to malloc path buffer");
+ return -1;
+ }
+ strcpy(path_buf, path_arg);
#ifdef ALTSEP
- for (p = buf; *p; p++) {
+ for (p = path_buf; *p; p++) {
if (*p == ALTSEP)
*p = SEP;
}
struct stat statbuf;
int rv;
- rv = stat(buf, &statbuf);
+ rv = stat(path_buf, &statbuf);
if (rv == 0) {
/* it exists */
if (S_ISREG(statbuf.st_mode))
/* it's a file */
- path = buf;
+ path = path_buf;
break;
}
#else
- if (object_exists(buf)) {
+ if (object_exists(path_buf)) {
/* it exists */
- if (isfile(buf))
+ if (isfile(path_buf))
/* it's a file */
- path = buf;
+ path = path_buf;
break;
}
#endif
/* back up one path element */
- p = strrchr(buf, SEP);
+ p = strrchr(path_buf, SEP);
if (prefix != NULL)
*prefix = SEP;
if (p == NULL)
}
if (path != NULL) {
PyObject *files;
+ if (Py_VerboseFlag && prefix && prefix[0] != '\0')
+ PySys_WriteStderr("# zipimport: prefix=%s constructing a "
+ "zipimporter for %s\n", prefix, path);
+ /* NOTE(gps): test_zipimport.py never exercises a case where
+ * prefix is non-empty. When would that ever be possible?
+ * Are we missing coverage or is prefix simply never needed?
+ */
files = PyDict_GetItemString(zip_directory_cache, path);
if (files == NULL) {
PyObject *zip_stat = NULL;
- FILE *fp = fopen_rb_and_stat(buf, &zip_stat);
+ FILE *fp = fopen_rb_and_stat(path, &zip_stat);
if (fp == NULL) {
PyErr_Format(ZipImportError, "can't open Zip file: "
- "'%.200s'", buf);
+ "'%.200s'", path);
Py_XDECREF(zip_stat);
- return -1;
+ goto error;
}
if (Py_VerboseFlag)
PySys_WriteStderr("# zipimport: %s not cached, "
"reading TOC.\n", path);
- files = read_directory(fp, buf);
+ files = read_directory(fp, path);
fclose(fp);
if (files == NULL) {
Py_XDECREF(zip_stat);
- return -1;
+ goto error;
}
if (PyDict_SetItemString(zip_directory_cache, path,
files) != 0) {
Py_DECREF(files);
Py_XDECREF(zip_stat);
- return -1;
+ goto error;
}
if (zip_stat && PyDict_SetItemString(zip_stat_cache, path,
zip_stat) != 0) {
Py_DECREF(files);
Py_DECREF(zip_stat);
- return -1;
+ goto error;
}
Py_XDECREF(zip_stat);
}
- else
- Py_INCREF(files);
- self->files = files;
}
else {
PyErr_SetString(ZipImportError, "not a Zip file");
- return -1;
+ goto error;
}
if (prefix == NULL)
}
}
- self->archive = PyString_FromString(buf);
+ self->archive = PyString_FromString(path);
if (self->archive == NULL)
- return -1;
+ goto error;
self->prefix = PyString_FromString(prefix);
if (self->prefix == NULL)
- return -1;
-
- return 0;
-}
+ goto error;
-/* GC support. */
-static int
-zipimporter_traverse(PyObject *obj, visitproc visit, void *arg)
-{
- ZipImporter *self = (ZipImporter *)obj;
- Py_VISIT(self->files);
+ PyMem_Free(path_buf);
return 0;
+error:
+ PyMem_Free(path_buf);
+ return -1;
}
static void
zipimporter_dealloc(ZipImporter *self)
{
- PyObject_GC_UnTrack(self);
Py_XDECREF(self->archive);
Py_XDECREF(self->prefix);
- Py_XDECREF(self->files);
Py_TYPE(self)->tp_free((PyObject *)self);
}
char *subname, path[MAXPATHLEN + 1];
int len;
struct st_zip_searchorder *zso;
+ PyObject *files;
subname = get_subname(fullname);
if (len < 0)
return MI_ERROR;
+ files = PyDict_GetItem(zip_directory_cache, self->archive);
+ if (files == NULL) {
+ /* Some scoundrel has cleared zip_directory_cache out from
+ * beneath us. Try repopulating it once before giving up. */
+ char *unused_archive_name;
+ FILE *fp = safely_reopen_archive(self, &unused_archive_name);
+ if (fp == NULL)
+ return MI_ERROR;
+ fclose(fp);
+ files = PyDict_GetItem(zip_directory_cache, self->archive);
+ if (files == NULL)
+ return MI_ERROR;
+ }
+
for (zso = zip_searchorder; *zso->suffix; zso++) {
strcpy(path + len, zso->suffix);
- if (PyDict_GetItemString(self->files, path) != NULL) {
+ if (PyDict_GetItemString(files, path) != NULL) {
if (zso->type & IS_PACKAGE)
return MI_PACKAGE;
else
#ifdef ALTSEP
char *p, buf[MAXPATHLEN + 1];
#endif
- PyObject *toc_entry, *data;
+ PyObject *toc_entry, *data, *files;
Py_ssize_t len;
if (!PyArg_ParseTuple(args, "s:zipimporter.get_data", &path))
if (fp == NULL)
return NULL;
- toc_entry = PyDict_GetItemString(self->files, path);
+ files = PyDict_GetItem(zip_directory_cache, self->archive);
+ if (files == NULL) {
+ /* This should never happen as safely_reopen_archive() should
+ * have repopulated zip_directory_cache if needed. */
+ return NULL;
+ }
+ toc_entry = PyDict_GetItemString(files, path);
if (toc_entry == NULL) {
PyErr_SetFromErrnoWithFilename(PyExc_IOError, path);
fclose(fp);
zipimporter_get_source(PyObject *obj, PyObject *args)
{
ZipImporter *self = (ZipImporter *)obj;
- PyObject *toc_entry;
+ PyObject *toc_entry, *files;
FILE *fp;
char *fullname, *subname, path[MAXPATHLEN+1], *archive;
int len;
if (fp == NULL)
return NULL;
- toc_entry = PyDict_GetItemString(self->files, path);
+ files = PyDict_GetItem(zip_directory_cache, self->archive);
+ if (files == NULL) {
+ /* This should never happen as safely_reopen_archive() should
+ * have repopulated zip_directory_cache if needed. */
+ return NULL;
+ }
+ toc_entry = PyDict_GetItemString(files, path);
if (toc_entry != NULL) {
PyObject *data = get_data(fp, archive, toc_entry);
fclose(fp);
PyObject_GenericGetAttr, /* tp_getattro */
0, /* tp_setattro */
0, /* tp_as_buffer */
- Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE |
- Py_TPFLAGS_HAVE_GC, /* tp_flags */
+ Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /* tp_flags */
zipimporter_doc, /* tp_doc */
- zipimporter_traverse, /* tp_traverse */
+ 0, /* tp_traverse */
0, /* tp_clear */
0, /* tp_richcompare */
0, /* tp_weaklistoffset */
(initproc)zipimporter_init, /* tp_init */
PyType_GenericAlloc, /* tp_alloc */
PyType_GenericNew, /* tp_new */
- PyObject_GC_Del, /* tp_free */
+ 0, /* tp_free */
};
fclose(fp);
return NULL;
}
- Py_XDECREF(self->files); /* free the old value. */
- self->files = files;
}
Py_DECREF(stat_now);
- } /* stat succeeded */
+ } else {
+ if (Py_VerboseFlag)
+ PySys_WriteStderr("# zipimport: os.fstat failed on the "
+ "open %s file.\n", archive);
+ }
return fp;
}
static time_t
get_mtime_of_source(ZipImporter *self, char *path)
{
- PyObject *toc_entry;
+ PyObject *toc_entry, *files;
time_t mtime = 0;
Py_ssize_t lastchar = strlen(path) - 1;
char savechar = path[lastchar];
path[lastchar] = '\0'; /* strip 'c' or 'o' from *.py[co] */
- toc_entry = PyDict_GetItemString(self->files, path);
+ files = PyDict_GetItem(zip_directory_cache, self->archive);
+ if (files == NULL) {
+ /* This should never happen as safely_reopen_archive() from
+ * our only caller repopulated zip_directory_cache if needed. */
+ return 0;
+ }
+ toc_entry = PyDict_GetItemString(files, path);
if (toc_entry != NULL && PyTuple_Check(toc_entry) &&
PyTuple_Size(toc_entry) == 8) {
/* fetch the time stamp of the .py file for comparison
return NULL;
for (zso = zip_searchorder; *zso->suffix; zso++) {
- PyObject *code = NULL;
+ PyObject *code = NULL, *files;
strcpy(path + len, zso->suffix);
if (Py_VerboseFlag > 1)
PyString_AsString(self->archive),
SEP, path);
- toc_entry = PyDict_GetItemString(self->files, path);
+ files = PyDict_GetItem(zip_directory_cache, self->archive);
+ if (files == NULL) {
+ /* This should never happen as safely_reopen_archive() should
+ * have repopulated zip_directory_cache if needed; and the GIL
+ * is being held. */
+ return NULL;
+ }
+ toc_entry = PyDict_GetItemString(files, path);
if (toc_entry != NULL) {
time_t mtime = 0;
int ispackage = zso->type & IS_PACKAGE;