]> granicus.if.org Git - python/commitdiff
Check-in of the most essential parts of SF 589982 (tempfile.py
authorGuido van Rossum <guido@python.org>
Fri, 9 Aug 2002 16:14:33 +0000 (16:14 +0000)
committerGuido van Rossum <guido@python.org>
Fri, 9 Aug 2002 16:14:33 +0000 (16:14 +0000)
rewrite, by Zack Weinberg).  This replaces most code in tempfile.py
(please review!!!) and adds extensive unit tests for it.

This will cause some warnings in the test suite; I'll check those in
soon, and also the docs.

Lib/tempfile.py
Lib/test/test_tempfile.py

index eb8253c9103735733e951aed5e9ff5555fac9721..f3cc4819ef5b6b4fce5e7baa49c7f35aa9edca03 100644 (file)
-"""Temporary files and filenames."""
+"""Temporary files.
 
-# XXX This tries to be not UNIX specific, but I don't know beans about
-# how to choose a temp directory or filename on MS-DOS or other
-# systems so it may have to be changed...
+This module provides generic, low- and high-level interfaces for
+creating temporary files and directories.  The interfaces listed
+as "safe" just below can be used without fear of race conditions.
+Those listed as "unsafe" cannot, and are provided for backward
+compatibility only.
 
-import os
+This module also provides some data items to the user:
 
-__all__ = ["mktemp", "TemporaryFile", "tempdir", "gettempprefix"]
+  TMP_MAX  - maximum number of names that will be tried before
+             giving up.
+  template - the default prefix for all temporary names.
+             You may change this to control the default prefix.
+  tempdir  - If this is set to a string before the first use of
+             any routine from this module, it will be considered as
+             another candidate location to store temporary files.
+"""
+
+__all__ = [
+    "NamedTemporaryFile", "TemporaryFile", # high level safe interfaces
+    "mkstemp", "mkdtemp",                  # low level safe interfaces
+    "mktemp",                              # deprecated unsafe interface
+    "TMP_MAX", "gettempprefix",            # constants
+    "tempdir", "gettempdir"
+   ]
+
+
+# Imports.
+
+import os as _os
+import errno as _errno
+from random import Random as _Random
+
+if _os.name == 'mac':
+    import macfs as _macfs
+    import MACFS as _MACFS
+
+try:
+    import fcntl as _fcntl
+    def _set_cloexec(fd):
+        flags = _fcntl.fcntl(fd, _fcntl.F_GETFD, 0)
+        if flags >= 0:
+            # flags read successfully, modify
+            flags |= _fcntl.FD_CLOEXEC
+            _fcntl.fcntl(fd, _fcntl.F_SETFD, flags)
+except (ImportError, AttributeError):
+    def _set_cloexec(fd):
+        pass
+
+try:
+    import thread as _thread
+    _allocate_lock = _thread.allocate_lock
+except (ImportError, AttributeError):
+    class _allocate_lock:
+        def acquire(self):
+            pass
+        release = acquire
+
+_text_openflags = _os.O_RDWR | _os.O_CREAT | _os.O_EXCL
+if hasattr(_os, 'O_NOINHERIT'): _text_openflags |= _os.O_NOINHERIT
+if hasattr(_os, 'O_NOFOLLOW'): _text_openflags |= _os.O_NOFOLLOW
+
+_bin_openflags = _text_openflags
+if hasattr(_os, 'O_BINARY'): _bin_openflags |= _os.O_BINARY
+
+if hasattr(_os, 'TMP_MAX'):
+    TMP_MAX = _os.TMP_MAX
+else:
+    TMP_MAX = 10000
+
+if _os.name == 'nt':
+    template = '~t' # cater to eight-letter limit
+else:
+    template = "tmp"
 
-# Parameters that the caller may set to override the defaults
 tempdir = None
-template = None
 
-def gettempdir():
-    """Function to calculate the directory to use."""
-    global tempdir
-    if tempdir is not None:
-        return tempdir
-
-    # _gettempdir_inner deduces whether a candidate temp dir is usable by
-    # trying to create a file in it, and write to it.  If that succeeds,
-    # great, it closes the file and unlinks it.  There's a race, though:
-    # the *name* of the test file it tries is the same across all threads
-    # under most OSes (Linux is an exception), and letting multiple threads
-    # all try to open, write to, close, and unlink a single file can cause
-    # a variety of bogus errors (e.g., you cannot unlink a file under
-    # Windows if anyone has it open, and two threads cannot create the
-    # same file in O_EXCL mode under Unix).  The simplest cure is to serialize
-    # calls to _gettempdir_inner.  This isn't a real expense, because the
-    # first thread to succeed sets the global tempdir, and all subsequent
-    # calls to gettempdir() reuse that without trying _gettempdir_inner.
-    _tempdir_lock.acquire()
+# Internal routines.
+
+_once_lock = _allocate_lock()
+
+def _once(var, initializer):
+    """Wrapper to execute an initialization operation just once,
+    even if multiple threads reach the same point at the same time.
+
+    var is the name (as a string) of the variable to be entered into
+    the current global namespace.
+
+    initializer is a callable which will return the appropriate initial
+    value for variable.  It will be called only if variable is not
+    present in the global namespace, or its current value is None.
+
+    Do not call _once from inside an initializer routine, it will deadlock.
+    """
+
+    vars = globals()
+    lock = _once_lock
+
+    # Check first outside the lock.
+    if var in vars and vars[var] is not None:
+        return
     try:
-        return _gettempdir_inner()
+        lock.acquire()
+        # Check again inside the lock.
+        if var in vars and vars[var] is not None:
+            return
+        vars[var] = initializer()
     finally:
-        _tempdir_lock.release()
+        lock.release()
+
+class _RandomNameSequence:
+    """An instance of _RandomNameSequence generates an endless
+    sequence of unpredictable strings which can safely be incorporated
+    into file names.  Each string is six characters long.  Multiple
+    threads can safely use the same instance at the same time.
+
+    _RandomNameSequence is an iterator."""
+
+    characters = (  "abcdefghijklmnopqrstuvwxyz"
+                  + "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
+                  + "0123456789-_")
+
+    def __init__(self):
+        self.mutex = _allocate_lock()
+        self.rng = _Random()
+        self.normcase = _os.path.normcase
+    def __iter__(self):
+        return self
+
+    def next(self):
+        m = self.mutex
+        c = self.characters
+        r = self.rng
 
-def _gettempdir_inner():
-    """Function to calculate the directory to use."""
-    global tempdir
-    if tempdir is not None:
-        return tempdir
-    try:
-        pwd = os.getcwd()
-    except (AttributeError, os.error):
-        pwd = os.curdir
-    attempdirs = ['/tmp', '/var/tmp', '/usr/tmp', pwd]
-    if os.name == 'nt':
-        attempdirs.insert(0, 'C:\\TEMP')
-        attempdirs.insert(0, '\\TEMP')
-    elif os.name == 'mac':
-        import macfs, MACFS
         try:
-            refnum, dirid = macfs.FindFolder(MACFS.kOnSystemDisk,
-                                             MACFS.kTemporaryFolderType, 1)
-            dirname = macfs.FSSpec((refnum, dirid, '')).as_pathname()
-            attempdirs.insert(0, dirname)
-        except macfs.error:
-            pass
-    elif os.name == 'riscos':
-        scrapdir = os.getenv('Wimp$ScrapDir')
-        if scrapdir:
-            attempdirs.insert(0, scrapdir)
+            m.acquire()
+            letters = ''.join([r.choice(c), r.choice(c), r.choice(c),
+                               r.choice(c), r.choice(c), r.choice(c)])
+        finally:
+            m.release()
+
+        return self.normcase(letters)
+
+def _candidate_tempdir_list():
+    """Generate a list of candidate temporary directories which
+    _get_default_tempdir will try."""
+
+    dirlist = []
+
+    # First, try the environment.
     for envname in 'TMPDIR', 'TEMP', 'TMP':
-        if envname in os.environ:
-            attempdirs.insert(0, os.environ[envname])
-    testfile = gettempprefix() + 'test'
-    for dir in attempdirs:
+        dirname = _os.getenv(envname)
+        if dirname: dirlist.append(dirname)
+
+    # Failing that, try OS-specific locations.
+    if _os.name == 'mac':
         try:
-            filename = os.path.join(dir, testfile)
-            if os.name == 'posix':
-                try:
-                    fd = os.open(filename,
-                                 os.O_RDWR | os.O_CREAT | os.O_EXCL, 0700)
-                except OSError:
-                    pass
-                else:
-                    fp = os.fdopen(fd, 'w')
-                    fp.write('blat')
-                    fp.close()
-                    os.unlink(filename)
-                    del fp, fd
-                    tempdir = dir
-                    break
-            else:
-                fp = open(filename, 'w')
+            refnum, dirid = _macfs.FindFolder(_MACFS.kOnSystemDisk,
+                                              _MACFS.kTemporaryFolderType, 1)
+            dirname = _macfs.FSSpec((refnum, dirid, '')).as_pathname()
+            dirlist.append(dirname)
+        except _macfs.error:
+            pass
+    elif _os.name == 'riscos':
+        dirname = _os.getenv('Wimp$ScrapDir')
+        if dirname: dirlist.append(dirname)
+    elif _os.name == 'nt':
+        dirlist.extend([ r'c:\temp', r'c:\tmp', r'\temp', r'\tmp' ])
+    else:
+        dirlist.extend([ '/tmp', '/var/tmp', '/usr/tmp' ])
+
+    # As a last resort, the current directory.
+    try:
+        dirlist.append(_os.getcwd())
+    except (AttributeError, _os.error):
+        dirlist.append(_os.curdir)
+
+    return dirlist
+    
+def _get_default_tempdir():
+    """Calculate the default directory to use for temporary files.
+    This routine should be called through '_once' (see above) as we
+    do not want multiple threads attempting this calculation simultaneously.
+
+    We determine whether or not a candidate temp dir is usable by
+    trying to create and write to a file in that directory.  If this
+    is successful, the test file is deleted.  To prevent denial of
+    service, the name of the test file must be randomized."""
+
+    namer = _RandomNameSequence()
+    dirlist = _candidate_tempdir_list()
+    flags = _text_openflags
+
+    for dir in dirlist:
+        if dir != _os.curdir:
+            dir = _os.path.normcase(_os.path.abspath(dir))
+        # Try only a few names per directory.
+        for seq in xrange(100):
+            name = namer.next()
+            filename = _os.path.join(dir, name)
+            try:
+                fd = _os.open(filename, flags, 0600)
+                fp = _os.fdopen(fd, 'w')
                 fp.write('blat')
                 fp.close()
-                os.unlink(filename)
-                tempdir = dir
-                break
-        except IOError:
-            pass
-    if tempdir is None:
-        msg = "Can't find a usable temporary directory amongst " + `attempdirs`
-        raise IOError, msg
-    return tempdir
+                _os.unlink(filename)
+                del fp, fd
+                return dir
+            except (OSError, IOError), e:
+                if e[0] != _errno.EEXIST:
+                    break # no point trying more names in this directory
+                pass
+    raise IOError, (_errno.ENOENT,
+                    ("No usable temporary directory found in %s" % dirlist))
 
+def _get_candidate_names():
+    """Common setup sequence for all user-callable interfaces."""
 
-# template caches the result of gettempprefix, for speed, when possible.
-# XXX unclear why this isn't "_template"; left it "template" for backward
-# compatibility.
-if os.name == "posix":
-    # We don't try to cache the template on posix:  the pid may change on us
-    # between calls due to a fork, and on Linux the pid changes even for
-    # another thread in the same process.  Since any attempt to keep the
-    # cache in synch would have to call os.getpid() anyway in order to make
-    # sure the pid hasn't changed between calls, a cache wouldn't save any
-    # time.  In addition, a cache is difficult to keep correct with the pid
-    # changing willy-nilly, and earlier attempts proved buggy (races).
-    template = None
-
-# Else the pid never changes, so gettempprefix always returns the same
-# string.
-elif os.name == "nt":
-    template = '~' + `os.getpid()` + '-'
-elif os.name in ('mac', 'riscos'):
-    template = 'Python-Tmp-'
-else:
-    template = 'tmp' # XXX might choose a better one
+    _once('_name_sequence', _RandomNameSequence)
+    return _name_sequence
+
+
+def _mkstemp_inner(dir, pre, suf, flags):
+    """Code common to mkstemp, TemporaryFile, and NamedTemporaryFile."""
+
+    names = _get_candidate_names()
+
+    for seq in xrange(TMP_MAX):
+        name = names.next()
+        file = _os.path.join(dir, pre + name + suf)
+        try:
+            fd = _os.open(file, flags, 0600)
+            _set_cloexec(fd)
+            return (fd, file)
+        except OSError, e:
+            if e.errno == _errno.EEXIST:
+                continue # try again
+            raise
+
+    raise IOError, (_errno.EEXIST, "No usable temporary file name found")
+    
+
+# User visible interfaces.
 
 def gettempprefix():
-    """Function to calculate a prefix of the filename to use.
+    """Accessor for tempdir.template."""
+    return template
+
+def gettempdir():
+    """Accessor for tempdir.tempdir."""
+    _once('tempdir', _get_default_tempdir)
+    return tempdir
+
+def mkstemp(suffix="", prefix=template, dir=gettempdir(), binary=1):
+    """mkstemp([suffix, [prefix, [dir, [binary]]]])
+    User-callable function to create and return a unique temporary
+    file.  The return value is a pair (fd, name) where fd is the
+    file descriptor returned by os.open, and name is the filename.
+
+    If 'suffix' is specified, the file name will end with that suffix,
+    otherwise there will be no suffix.
+
+    If 'prefix' is specified, the file name will begin with that prefix,
+    otherwise a default prefix is used.
+
+    If 'dir' is specified, the file will be created in that directory,
+    otherwise a default directory is used.
+
+    If 'binary' is specified and false, the file is opened in binary
+    mode.  Otherwise, the file is opened in text mode.  On some
+    operating systems, this makes no difference.
+
+    The file is readable and writable only by the creating user ID.
+    If the operating system uses permission bits to indicate whether a
+    file is executable, the file is executable by no one. The file
+    descriptor is not inherited by children of this process.
 
-    This incorporates the current process id on systems that support such a
-    notion, so that concurrent processes don't generate the same prefix.
+    Caller is responsible for deleting the file when done with it.
     """
 
-    global template
-    if template is None:
-        return '@' + `os.getpid()` + '.'
+    if binary:
+        flags = _bin_openflags
     else:
-        return template
+        flags = _text_openflags
 
+    return _mkstemp_inner(dir, prefix, suffix, flags)
 
-def mktemp(suffix=""):
-    """User-callable function to return a unique temporary file name."""
-    dir = gettempdir()
-    pre = gettempprefix()
-    while 1:
-        i = _counter.get_next()
-        file = os.path.join(dir, pre + str(i) + suffix)
-        if not os.path.exists(file):
+
+def mkdtemp(suffix="", prefix=template, dir=gettempdir()):
+    """mkdtemp([suffix, [prefix, [dir]]])
+    User-callable function to create and return a unique temporary
+    directory.  The return value is the pathname of the directory.
+
+    Arguments are as for mkstemp, except that the 'binary' argument is
+    not accepted.
+
+    The directory is readable, writable, and searchable only by the
+    creating user.
+
+    Caller is responsible for deleting the directory when done with it.
+    """
+
+    names = _get_candidate_names()
+    
+    for seq in xrange(TMP_MAX):
+        name = names.next()
+        file = _os.path.join(dir, prefix + name + suffix)
+        try:
+            _os.mkdir(file, 0700)
             return file
+        except OSError, e:
+            if e.errno == _errno.EEXIST:
+                continue # try again
+            raise
 
+    raise IOError, (_errno.EEXIST, "No usable temporary directory name found")
 
-class TemporaryFileWrapper:
-    """Temporary file wrapper
+def mktemp(suffix="", prefix=template, dir=gettempdir()):
+    """mktemp([suffix, [prefix, [dir]]])
+    User-callable function to return a unique temporary file name.  The
+    file is not created.
 
-    This class provides a wrapper around files opened for temporary use.
-    In particular, it seeks to automatically remove the file when it is
-    no longer needed.
+    Arguments are as for mkstemp, except that the 'binary' argument is
+    not accepted.
+
+    This function is unsafe and should not be used.  The file name
+    refers to a file that did not exist at some point, but by the time
+    you get around to creating it, someone else may have beaten you to
+    the punch.
     """
 
-    # Cache the unlinker so we don't get spurious errors at shutdown
-    # when the module-level "os" is None'd out.  Note that this must
-    # be referenced as self.unlink, because the name TemporaryFileWrapper
-    # may also get None'd out before __del__ is called.
-    unlink = os.unlink
+    from warnings import warn as _warn
+    _warn("mktemp is a potential security risk to your program",
+          RuntimeWarning, stacklevel=2)
 
-    def __init__(self, file, path):
-        self.file = file
-        self.path = path
-        self.close_called = 0
+    names = _get_candidate_names()
+    for seq in xrange(TMP_MAX):
+        name = names.next()
+        file = _os.path.join(dir, prefix + name + suffix)
+        if not _os.path.exists(file):
+            return file
+
+    raise IOError, (_errno.EEXIST, "No usable temporary filename found")
 
-    def close(self):
-        if not self.close_called:
-            self.close_called = 1
-            self.file.close()
-            self.unlink(self.path)
+class _TemporaryFileWrapper:
+    """Temporary file wrapper
+
+    This class provides a wrapper around files opened for
+    temporary use.  In particular, it seeks to automatically
+    remove the file when it is no longer needed.
+    """
 
-    def __del__(self):
-        self.close()
+    def __init__(self, file, name):
+        self.file = file
+        self.name = name
+        self.close_called = 0
 
     def __getattr__(self, name):
         file = self.__dict__['file']
@@ -180,93 +363,81 @@ class TemporaryFileWrapper:
             setattr(self, name, a)
         return a
 
-try:
-    import fcntl as _fcntl
-    def _set_cloexec(fd, flag=_fcntl.FD_CLOEXEC):
-        flags = _fcntl.fcntl(fd, _fcntl.F_GETFD, 0)
-        if flags >= 0:
-            # flags read successfully, modify
-            flags |= flag
-            _fcntl.fcntl(fd, _fcntl.F_SETFD, flags)
-except (ImportError, AttributeError):
-    def _set_cloexec(fd):
-        pass
-
-def TemporaryFile(mode='w+b', bufsize=-1, suffix=""):
-    """Create and return a temporary file (opened read-write by default)."""
-    name = mktemp(suffix)
-    if os.name == 'posix':
-        # Unix -- be very careful
-        fd = os.open(name, os.O_RDWR|os.O_CREAT|os.O_EXCL, 0700)
-        _set_cloexec(fd)
-        try:
-            os.unlink(name)
-            return os.fdopen(fd, mode, bufsize)
-        except:
-            os.close(fd)
-            raise
-    elif os.name == 'nt':
-        # Windows -- can't unlink an open file, but O_TEMPORARY creates a
-        # file that "deletes itself" when the last handle is closed.
-        # O_NOINHERIT ensures processes created via spawn() don't get a
-        # handle to this too.  That would be a security hole, and, on my
-        # Win98SE box, when an O_TEMPORARY file is inherited by a spawned
-        # process, the fd in the spawned process seems to lack the
-        # O_TEMPORARY flag, so the file doesn't go away by magic then if the
-        # spawning process closes it first.
-        flags = (os.O_RDWR | os.O_CREAT | os.O_EXCL |
-                 os.O_TEMPORARY | os.O_NOINHERIT)
-        if 'b' in mode:
-            flags |= os.O_BINARY
-        fd = os.open(name, flags, 0700)
-        return os.fdopen(fd, mode, bufsize)
-    else:
-        # Assume we can't unlink a file that's still open, or arrange for
-        # an automagically self-deleting file -- use wrapper.
-        file = open(name, mode, bufsize)
-        return TemporaryFileWrapper(file, name)
-
-# In order to generate unique names, mktemp() uses _counter.get_next().
-# This returns a unique integer on each call, in a threadsafe way (i.e.,
-# multiple threads will never see the same integer).  The integer will
-# usually be a Python int, but if _counter.get_next() is called often
-# enough, it will become a Python long.
-# Note that the only names that survive this next block of code
-# are "_counter" and "_tempdir_lock".
-
-class _ThreadSafeCounter:
-    def __init__(self, mutex, initialvalue=0):
-        self.mutex = mutex
-        self.i = initialvalue
-
-    def get_next(self):
-        self.mutex.acquire()
-        result = self.i
-        try:
-            newi = result + 1
-        except OverflowError:
-            newi = long(result) + 1
-        self.i = newi
-        self.mutex.release()
-        return result
+    # NT provides delete-on-close as a primitive, so we don't need
+    # the wrapper to do anything special.  We still use it so that
+    # file.name is useful (i.e. not "(fdopen)") with NamedTemporaryFile.
+    if _os.name != 'nt':
+
+        # Cache the unlinker so we don't get spurious errors at
+        # shutdown when the module-level "os" is None'd out.  Note
+        # that this must be referenced as self.unlink, because the
+        # name TemporaryFileWrapper may also get None'd out before
+        # __del__ is called.
+        unlink = _os.unlink
+
+        def close(self):
+            if not self.close_called:
+                self.close_called = 1
+                self.file.close()
+                self.unlink(self.name)
+
+        def __del__(self):
+            self.close()
+
+def NamedTemporaryFile(mode='w+b', bufsize=-1, suffix="",
+                       prefix=template, dir=gettempdir()):
+    """Create and return a temporary file.
+    Arguments:
+    'prefix', 'suffix', 'dir' -- as for mkstemp.
+    'mode' -- the mode argument to os.fdopen (default "w+b").
+    'bufsize' -- the buffer size argument to os.fdopen (default -1).
+    The file is created as mkstemp() would do it.
+
+    Returns a file object; the name of the file is accessible as
+    file.name.  The file will be automatically deleted when it is
+    closed.
+    """
 
-try:
-    import thread
+    bin = 'b' in mode
+    if bin: flags = _bin_openflags
+    else:   flags = _text_openflags
 
-except ImportError:
-    class _DummyMutex:
-        def acquire(self):
-            pass
+    # Setting O_TEMPORARY in the flags causes the OS to delete
+    # the file when it is closed.  This is only supported by Windows.
+    if _os.name == 'nt':
+        flags |= _os.O_TEMPORARY
 
-        release = acquire
+    (fd, name) = _mkstemp_inner(dir, prefix, suffix, flags)
+    file = _os.fdopen(fd, mode, bufsize)
+    return _TemporaryFileWrapper(file, name)
 
-    _counter = _ThreadSafeCounter(_DummyMutex())
-    _tempdir_lock = _DummyMutex()
-    del _DummyMutex
+if _os.name != 'posix':
+    # On non-POSIX systems, assume that we cannot unlink a file while
+    # it is open.
+    TemporaryFile = NamedTemporaryFile
 
 else:
-    _counter = _ThreadSafeCounter(thread.allocate_lock())
-    _tempdir_lock = thread.allocate_lock()
-    del thread
-
-del _ThreadSafeCounter
+    def TemporaryFile(mode='w+b', bufsize=-1, suffix="",
+                      prefix=template, dir=gettempdir()):
+        """Create and return a temporary file.
+        Arguments:
+        'prefix', 'suffix', 'directory' -- as for mkstemp.
+        'mode' -- the mode argument to os.fdopen (default "w+b").
+        'bufsize' -- the buffer size argument to os.fdopen (default -1).
+        The file is created as mkstemp() would do it.
+
+        Returns a file object.  The file has no name, and will cease to
+        exist when it is closed.
+        """
+
+        bin = 'b' in mode
+        if bin: flags = _bin_openflags
+        else:   flags = _text_openflags
+
+        (fd, name) = _mkstemp_inner(dir, prefix, suffix, flags)
+        try:
+            _os.unlink(name)
+            return _os.fdopen(fd, mode, bufsize)
+        except:
+            _os.close(fd)
+            raise
index 286e79cfa459ee7053d414db89919d16067bb790..d96aae7f7e76bf5ec0d64d1bae20b258c994e629 100644 (file)
-# SF bug #476138: tempfile behavior across platforms
-# Ensure that a temp file can be closed any number of times without error.
+# tempfile.py unit tests.
 
 import tempfile
+import os
+import sys
+import re
+import errno
+import warnings
 
-f = tempfile.TemporaryFile("w+b")
-f.write('abc\n')
-f.close()
-f.close()
-f.close()
+import unittest
+from test import test_support
+
+if hasattr(os, 'stat'):
+    import stat
+    has_stat = 1
+else:
+    has_stat = 0
+
+has_textmode = (tempfile._text_openflags != tempfile._bin_openflags)
+
+# This is organized as one test for each chunk of code in tempfile.py,
+# in order of their appearance in the file.  Testing which requires
+# threads is not done here.
+
+# Common functionality.
+class TC(unittest.TestCase):
+
+    str_check = re.compile(r"[a-zA-Z0-9_-]{6}$")
+
+    def failOnException(self, what, ei=None):
+        if ei is None:
+            ei = sys.exc_info()
+        self.fail("%s raised %s: %s" % (what, ei[0], ei[1]))
+
+    def nameCheck(self, name, dir, pre, suf):
+        (ndir, nbase) = os.path.split(name)
+        npre  = nbase[:len(pre)]
+        nsuf  = nbase[len(nbase)-len(suf):]
+
+        self.assertEqual(ndir, dir,
+                         "file '%s' not in directory '%s'" % (name, dir))
+        self.assertEqual(npre, pre,
+                         "file '%s' does not begin with '%s'" % (nbase, pre))
+        self.assertEqual(nsuf, suf,
+                         "file '%s' does not end with '%s'" % (nbase, suf))
+
+        nbase = nbase[len(pre):len(nbase)-len(suf)]
+        self.assert_(self.str_check.match(nbase),
+                     "random string '%s' does not match /^[a-zA-Z0-9_-]{6}$/"
+                     % nbase)
+
+test_classes = []
+
+class test_exports(TC):
+    def test_exports(self):
+        """There are no surprising symbols in the tempfile module"""
+        dict = tempfile.__dict__
+
+        expected = {
+            "NamedTemporaryFile" : 1,
+            "TemporaryFile" : 1,
+            "mkstemp" : 1,
+            "mkdtemp" : 1,
+            "mktemp" : 1,
+            "TMP_MAX" : 1,
+            "gettempprefix" : 1,
+            "gettempdir" : 1,
+            "tempdir" : 1,
+            "template" : 1
+        }
+
+        unexp = []
+        for key in dict:
+            if key[0] != '_' and key not in expected:
+                unexp.append(key)
+        self.failUnless(len(unexp) == 0,
+                        "unexpected keys: %s" % unexp)
+
+test_classes.append(test_exports)
+
+
+class test__once(TC):
+    """Test the internal function _once."""
+
+    def setUp(self):
+        tempfile.once_var = None
+        self.already_called = 0
+
+    def tearDown(self):
+        del tempfile.once_var
+
+    def callMeOnce(self):
+        self.failIf(self.already_called, "callMeOnce called twice")
+        self.already_called = 1
+        return 24
+
+    def do_once(self):
+        tempfile._once('once_var', self.callMeOnce)
+
+    def test_once_initializes(self):
+        """_once initializes its argument"""
+
+        self.do_once()
+
+        self.assertEqual(tempfile.once_var, 24,
+                         "once_var=%d, not 24" % tempfile.once_var)
+        self.assertEqual(self.already_called, 1,
+                         "already_called=%d, not 1" % self.already_called)
+
+    def test_once_means_once(self):
+        """_once calls the callback just once"""
+
+        self.do_once()
+        self.do_once()
+        self.do_once()
+        self.do_once()
+
+    def test_once_namespace_safe(self):
+        """_once does not modify anything but its argument"""
+
+        env_copy = tempfile.__dict__.copy()
+
+        self.do_once()
+
+        env = tempfile.__dict__
+
+        a = env.keys()
+        a.sort()
+        b = env_copy.keys()
+        b.sort()
+
+        self.failIf(len(a) != len(b))
+        for i in xrange(len(a)):
+            self.failIf(a[i] != b[i])
+
+            key = a[i]
+            if key != 'once_var':
+                self.failIf(env[key] != env_copy[key])
+
+test_classes.append(test__once)
+
+
+class test__RandomNameSequence(TC):
+    """Test the internal iterator object _RandomNameSequence."""
+
+    def setUp(self):
+        self.r = tempfile._RandomNameSequence()
+
+    def test_get_six_char_str(self):
+        """_RandomNameSequence returns a six-character string"""
+        s = self.r.next()
+        self.nameCheck(s, '', '', '')
+
+    def test_many(self):
+        """_RandomNameSequence returns no duplicate strings (stochastic)"""
+
+        dict = {}
+        r = self.r
+        for i in xrange(1000):
+            s = r.next()
+            self.nameCheck(s, '', '', '')
+            self.failIf(s in dict)
+            dict[s] = 1
+
+    def test_supports_iter(self):
+        """_RandomNameSequence supports the iterator protocol"""
+
+        i = 0
+        r = self.r
+        try:
+            for s in r:
+                i += 1
+                if i == 20:
+                    break
+        except:
+            failOnException("iteration")
+
+test_classes.append(test__RandomNameSequence)
+
+
+class test__candidate_tempdir_list(TC):
+    """Test the internal function _candidate_tempdir_list."""
+
+    def test_nonempty_list(self):
+        """_candidate_tempdir_list returns a nonempty list of strings"""
+
+        cand = tempfile._candidate_tempdir_list()
+
+        self.failIf(len(cand) == 0)
+        for c in cand:
+            self.assert_(isinstance(c, basestring),
+                         "%s is not a string" % c)
+
+    def test_wanted_dirs(self):
+        """_candidate_tempdir_list contains the expected directories"""
+
+        # Make sure the interesting environment variables are all set.
+        added = []
+        try:
+            for envname in 'TMPDIR', 'TEMP', 'TMP':
+                dirname = os.getenv(envname)
+                if not dirname:
+                    os.environ[envname] = os.path.abspath(envname)
+                    added.append(envname)
+
+            cand = tempfile._candidate_tempdir_list()
+
+            for envname in 'TMPDIR', 'TEMP', 'TMP':
+                dirname = os.getenv(envname)
+                if not dirname: raise ValueError
+                self.assert_(dirname in cand)
+
+            try:
+                dirname = os.getcwd()
+            except (AttributeError, os.error):
+                dirname = os.curdir
+
+            self.assert_(dirname in cand)
+
+            # Not practical to try to verify the presence of OS-specific
+            # paths in this list.
+        finally:
+            for p in added:
+                del os.environ[p]
+
+test_classes.append(test__candidate_tempdir_list)
+
+
+# We test _get_default_tempdir by testing gettempdir.
+
+
+class test__get_candidate_names(TC):
+    """Test the internal function _get_candidate_names."""
+
+    def test_retval(self):
+        """_get_candidate_names returns a _RandomNameSequence object"""
+        obj = tempfile._get_candidate_names()
+        self.assert_(isinstance(obj, tempfile._RandomNameSequence))
+
+    def test_same_thing(self):
+        """_get_candidate_names always returns the same object"""
+        a = tempfile._get_candidate_names()
+        b = tempfile._get_candidate_names()
+
+        self.assert_(a is b)
+
+test_classes.append(test__get_candidate_names)
+
+
+class test__mkstemp_inner(TC):
+    """Test the internal function _mkstemp_inner."""
+
+    class mkstemped:
+        _bflags = tempfile._bin_openflags
+        _tflags = tempfile._text_openflags
+        _close = os.close
+        _unlink = os.unlink
+
+        def __init__(self, dir, pre, suf, bin):
+            if bin: flags = self._bflags
+            else:   flags = self._tflags
+
+            (self.fd, self.name) = tempfile._mkstemp_inner(dir, pre, suf, flags)
+
+        def write(self, str):
+            os.write(self.fd, str)
+
+        def __del__(self):
+            self._close(self.fd)
+            self._unlink(self.name)
+            
+    def do_create(self, dir=None, pre="", suf="", bin=1):
+        if dir is None:
+            dir = tempfile.gettempdir()
+        try:
+            file = self.mkstemped(dir, pre, suf, bin)
+        except:
+            self.failOnException("_mkstemp_inner")
+
+        self.nameCheck(file.name, dir, pre, suf)
+        return file
+
+    def test_basic(self):
+        """_mkstemp_inner can create files"""
+        self.do_create().write("blat")
+        self.do_create(pre="a").write("blat")
+        self.do_create(suf="b").write("blat")
+        self.do_create(pre="a", suf="b").write("blat")
+        self.do_create(pre="aa", suf=".txt").write("blat")
+
+    def test_basic_many(self):
+        """_mkstemp_inner can create many files (stochastic)"""
+        extant = range(1000)
+        for i in extant:
+            extant[i] = self.do_create(pre="aa")
+
+    def test_choose_directory(self):
+        """_mkstemp_inner can create files in a user-selected directory"""
+        dir = tempfile.mkdtemp()
+        try:
+            self.do_create(dir=dir).write("blat")
+        finally:
+            os.rmdir(dir)
+
+    def test_file_mode(self):
+        """_mkstemp_inner creates files with the proper mode"""
+        if not has_stat:
+            return            # ugh, can't use TestSkipped.
+
+        file = self.do_create()
+        mode = stat.S_IMODE(os.stat(file.name).st_mode)
+        self.assertEqual(mode, 0600)
+
+    def test_noinherit(self):
+        """_mkstemp_inner file handles are not inherited by child processes"""
+        # FIXME: Find a way to test this on Windows.
+        if os.name != 'posix':
+            return            # ugh, can't use TestSkipped.
+
+        file = self.do_create()
+
+        # We have to exec something, so that FD_CLOEXEC will take
+        # effect.  The sanest thing to try is /bin/sh; we can easily
+        # instruct it to attempt to write to the fd and report success
+        # or failure.  Unfortunately, sh syntax does not permit use of
+        # fds numerically larger than 9; abandon this test if so.
+        if file.fd > 9:
+            raise test_support.TestSkipped, 'cannot test with fd %d' % file.fd
+
+        pid = os.fork()
+        if pid:
+            status = os.wait()[1]
+            self.failUnless(os.WIFEXITED(status),
+                            "child process did not exit (status %d)" % status)
+
+            # We want the child to have exited _un_successfully, indicating
+            # failure to write to the closed fd.
+            self.failUnless(os.WEXITSTATUS(status) != 0,
+                            "child process exited successfully")
+
+        else:
+            try:
+                # Throw away stderr.
+                nul = os.open('/dev/null', os.O_RDWR)
+                os.dup2(nul, 2)
+                os.execv('/bin/sh', ['sh', '-c', 'echo blat >&%d' % file.fd])
+            except:
+                os._exit(0)
+
+    def test_textmode(self):
+        """_mkstemp_inner can create files in text mode"""
+        if not has_textmode:
+            return            # ugh, can't use TestSkipped.
+
+        self.do_create(bin=0).write("blat\n")
+        # XXX should test that the file really is a text file
+
+test_classes.append(test__mkstemp_inner)
+
+
+class test_gettempprefix(TC):
+    """Test gettempprefix()."""
+
+    def test_sane_template(self):
+        """gettempprefix returns a nonempty prefix string"""
+        p = tempfile.gettempprefix()
+
+        self.assert_(isinstance(p, basestring))
+        self.assert_(len(p) > 0)
+
+    def test_usable_template(self):
+        """gettempprefix returns a usable prefix string"""
+
+        # Create a temp directory, avoiding use of the prefix.
+        # Then attempt to create a file whose name is
+        # prefix + 'xxxxxx.xxx' in that directory.
+        p = tempfile.gettempprefix() + "xxxxxx.xxx"
+        d = tempfile.mkdtemp(prefix="")
+        try:
+            p = os.path.join(d, p)
+            try:
+                fd = os.open(p, os.O_RDWR | os.O_CREAT)
+            except:
+                self.failOnException("os.open")
+            os.close(fd)
+            os.unlink(p)
+        finally:
+            os.rmdir(d)
+
+test_classes.append(test_gettempprefix)
+
+
+class test_gettempdir(TC):
+    """Test gettempdir()."""
+
+    def test_directory_exists(self):
+        """gettempdir returns a directory which exists"""
+
+        dir = tempfile.gettempdir()
+        self.assert_(os.path.isabs(dir) or dir == os.curdir,
+                     "%s is not an absolute path" % dir)
+        self.assert_(os.path.isdir(dir),
+                     "%s is not a directory" % dir)
+
+    def test_directory_writable(self):
+        """gettempdir returns a directory writable by the user"""
+
+        # sneaky: just instantiate a NamedTemporaryFile, which
+        # defaults to writing into the directory returned by
+        # gettempdir.
+        try:
+            file = tempfile.NamedTemporaryFile()
+            file.write("blat")
+            file.close()
+        except:
+            self.failOnException("create file in %s" % tempfile.gettempdir())
+
+    def test_same_thing(self):
+        """gettempdir always returns the same object"""
+        a = tempfile.gettempdir()
+        b = tempfile.gettempdir()
+
+        self.assert_(a is b)
+
+test_classes.append(test_gettempdir)
+
+
+class test_mkstemp(TC):
+    """Test mkstemp()."""
+    def do_create(self, dir=None, pre="", suf="", ):
+        if dir is None:
+            dir = tempfile.gettempdir()
+        try:
+            (fd, name) = tempfile.mkstemp(dir=dir, prefix=pre, suffix=suf)
+        except:
+            self.failOnException("mkstemp")
+
+        try:
+            self.nameCheck(name, dir, pre, suf)
+        finally:
+            os.close(fd)
+            os.unlink(name)
+
+    def test_basic(self):
+        """mkstemp can create files"""
+        self.do_create()
+        self.do_create(pre="a")
+        self.do_create(suf="b")
+        self.do_create(pre="a", suf="b")
+        self.do_create(pre="aa", suf=".txt")
+
+    def test_choose_directory(self):
+        """mkstemp can create directories in a user-selected directory"""
+        dir = tempfile.mkdtemp()
+        try:
+            self.do_create(dir=dir)
+        finally:
+            os.rmdir(dir)
+
+test_classes.append(test_mkstemp)
+
+
+class test_mkdtemp(TC):
+    """Test mkdtemp()."""
+
+    def do_create(self, dir=None, pre="", suf=""):
+        if dir is None:
+            dir = tempfile.gettempdir()
+        try:
+            name = tempfile.mkdtemp(dir=dir, prefix=pre, suffix=suf)
+        except:
+            self.failOnException("mkdtemp")
+
+        try:
+            self.nameCheck(name, dir, pre, suf)
+            return name
+        except:
+            os.rmdir(name)
+            raise
+
+    def test_basic(self):
+        """mkdtemp can create directories"""
+        os.rmdir(self.do_create())
+        os.rmdir(self.do_create(pre="a"))
+        os.rmdir(self.do_create(suf="b"))
+        os.rmdir(self.do_create(pre="a", suf="b"))
+        os.rmdir(self.do_create(pre="aa", suf=".txt"))
+        
+    def test_basic_many(self):
+        """mkdtemp can create many directories (stochastic)"""
+        extant = range(1000)
+        try:
+            for i in extant:
+                extant[i] = self.do_create(pre="aa")
+        finally:
+            for i in extant:
+                if(isinstance(i, basestring)):
+                    os.rmdir(i)
+
+    def test_choose_directory(self):
+        """mkdtemp can create directories in a user-selected directory"""
+        dir = tempfile.mkdtemp()
+        try:
+            os.rmdir(self.do_create(dir=dir))
+        finally:
+            os.rmdir(dir)
+
+    def test_mode(self):
+        """mkdtemp creates directories with the proper mode"""
+        if not has_stat:
+            return            # ugh, can't use TestSkipped.
+
+        dir = self.do_create()
+        try:
+            mode = stat.S_IMODE(os.stat(dir).st_mode)
+            self.assertEqual(mode, 0700)
+        finally:
+            os.rmdir(dir)
+
+test_classes.append(test_mkdtemp)
+
+
+class test_mktemp(TC):
+    """Test mktemp()."""
+
+    # For safety, all use of mktemp must occur in a private directory.
+    # We must also suppress the RuntimeWarning it generates.
+    def setUp(self):
+        self.dir = tempfile.mkdtemp()
+        warnings.filterwarnings("ignore",
+                                category=RuntimeWarning,
+                                message="mktemp")
+
+    def tearDown(self):
+        if self.dir:
+            os.rmdir(self.dir)
+            self.dir = None
+        # XXX This clobbers any -W options.
+        warnings.resetwarnings()
+
+    class mktemped:
+        _unlink = os.unlink
+        _bflags = tempfile._bin_openflags
+
+        def __init__(self, dir, pre, suf):
+            self.name = tempfile.mktemp(dir=dir, prefix=pre, suffix=suf)
+            # Create the file.  This will raise an exception if it's
+            # mysteriously appeared in the meanwhile.
+            os.close(os.open(self.name, self._bflags, 0600))
+
+        def __del__(self):
+            self._unlink(self.name)
+
+    def do_create(self, pre="", suf=""):
+        try:
+            file = self.mktemped(self.dir, pre, suf)
+        except:
+            self.failOnException("mktemp")
+
+        self.nameCheck(file.name, self.dir, pre, suf)
+        return file
+
+    def test_basic(self):
+        """mktemp can choose usable file names"""
+        self.do_create()
+        self.do_create(pre="a")
+        self.do_create(suf="b")
+        self.do_create(pre="a", suf="b")
+        self.do_create(pre="aa", suf=".txt")
+
+    def test_many(self):
+        """mktemp can choose many usable file names (stochastic)"""
+        extant = range(1000)
+        for i in extant:
+            extant[i] = self.do_create(pre="aa")
+
+    def test_warning(self):
+        """mktemp issues a warning when used"""
+        warnings.filterwarnings("error",
+                                category=RuntimeWarning,
+                                message="mktemp")
+        self.assertRaises(RuntimeWarning,
+                          tempfile.mktemp, (), { 'dir': self.dir })
+
+test_classes.append(test_mktemp)
+
+
+# We test _TemporaryFileWrapper by testing NamedTemporaryFile.
+
+
+class test_NamedTemporaryFile(TC):
+    """Test NamedTemporaryFile()."""
+
+    def do_create(self, dir=None, pre="", suf=""):
+        if dir is None:
+            dir = tempfile.gettempdir()
+        try:
+            file = tempfile.NamedTemporaryFile(dir=dir, prefix=pre, suffix=suf)
+        except:
+            self.failOnException("NamedTemporaryFile")
+
+        self.nameCheck(file.name, dir, pre, suf)
+        return file
+
+
+    def test_basic(self):
+        """NamedTemporaryFile can create files"""
+        self.do_create()
+        self.do_create(pre="a")
+        self.do_create(suf="b")
+        self.do_create(pre="a", suf="b")
+        self.do_create(pre="aa", suf=".txt")
+
+    def test_creates_named(self):
+        """NamedTemporaryFile creates files with names"""
+        f = tempfile.NamedTemporaryFile()
+        self.failUnless(os.path.exists(f.name),
+                        "NamedTemporaryFile %s does not exist" % f.name)
+
+    def test_del_on_close(self):
+        """A NamedTemporaryFile is deleted when closed"""
+        dir = tempfile.mkdtemp()
+        try:
+            f = tempfile.NamedTemporaryFile(dir=dir)
+            f.write('blat')
+            f.close()
+            self.failIf(os.path.exists(f.name),
+                        "NamedTemporaryFile %s exists after close" % f.name)
+        finally:
+            os.rmdir(dir)
+
+    def test_multiple_close(self):
+        """A NamedTemporaryFile can be closed many times without error"""
+
+        f = tempfile.NamedTemporaryFile()
+        f.write('abc\n')
+        f.close()
+        try:
+            f.close()
+            f.close()
+        except:
+            self.failOnException("close")
+
+    # How to test the mode and bufsize parameters?
+
+test_classes.append(test_NamedTemporaryFile)
+
+
+class test_TemporaryFile(TC):
+    """Test TemporaryFile()."""
+
+    def test_basic(self):
+        """TemporaryFile can create files"""
+        # No point in testing the name params - the file has no name.
+        try:
+            tempfile.TemporaryFile()
+        except:
+            self.failOnException("TemporaryFile")
+
+    def test_has_no_name(self):
+        """TemporaryFile creates files with no names (on this system)"""
+        dir = tempfile.mkdtemp()
+        f = tempfile.TemporaryFile(dir=dir)
+        f.write('blat')
+
+        # Sneaky: because this file has no name, it should not prevent
+        # us from removing the directory it was created in.
+        try:
+            os.rmdir(dir)
+        except:
+            ei = sys.exc_info()
+            # cleanup
+            f.close()
+            os.rmdir(dir)
+            self.failOnException("rmdir", ei)
+
+    def test_multiple_close(self):
+        """A TemporaryFile can be closed many times without error"""
+        f = tempfile.TemporaryFile()
+        f.write('abc\n')
+        f.close()
+        try:
+            f.close()
+            f.close()
+        except:
+            self.failOnException("close")
+
+    # How to test the mode and bufsize parameters?
+
+class dummy_test_TemporaryFile(TC):
+    def test_dummy(self):
+        """TemporaryFile and NamedTemporaryFile are the same (on this system)"""
+        pass
+
+if tempfile.NamedTemporaryFile is tempfile.TemporaryFile:
+    test_classes.append(dummy_test_TemporaryFile)
+else:
+    test_classes.append(test_TemporaryFile)
+
+def test_main():
+    suite = unittest.TestSuite()
+    for c in test_classes:
+        suite.addTest(unittest.makeSuite(c))
+    test_support.run_suite(suite)
+
+if __name__ == "__main__":
+    test_main()