]> granicus.if.org Git - python/commitdiff
bpo-30207: Rename test.test_support to test.support. (#1353)
authorSerhiy Storchaka <storchaka@gmail.com>
Sun, 30 Apr 2017 08:36:58 +0000 (11:36 +0300)
committerGitHub <noreply@github.com>
Sun, 30 Apr 2017 08:36:58 +0000 (11:36 +0300)
To simplify backports from Python 3, the test.test_support module
was converted into a package and renamed to test.support.  The
test.script_helper module was moved into the test.support package.
Names test.test_support and test.script_helper are left as aliases to
test.support and test.support.script_helper.

Doc/library/test.rst
Lib/test/script_helper.py
Lib/test/support/__init__.py [new file with mode: 0644]
Lib/test/support/script_helper.py [new file with mode: 0644]
Lib/test/test_compiler.py
Lib/test/test_linecache.py
Lib/test/test_support.py
Misc/NEWS

index be5a9e9f320f4cd584646addff55e63472a1b884..c32b93dae7966c4e1927abb1b910f38954baadd4 100644 (file)
@@ -15,8 +15,8 @@
 
 
 The :mod:`test` package contains all regression tests for Python as well as the
-modules :mod:`test.test_support` and :mod:`test.regrtest`.
-:mod:`test.test_support` is used to enhance your tests while
+modules :mod:`test.support` and :mod:`test.regrtest`.
+:mod:`test.support` is used to enhance your tests while
 :mod:`test.regrtest` drives the testing suite.
 
 Each module in the :mod:`test` package whose name starts with ``test_`` is a
@@ -54,7 +54,7 @@ stated.
 A basic boilerplate is often used::
 
    import unittest
-   from test import test_support
+   from test import support
 
    class MyTestCase1(unittest.TestCase):
 
@@ -82,10 +82,10 @@ A basic boilerplate is often used::
    ... more test classes ...
 
    def test_main():
-       test_support.run_unittest(MyTestCase1,
-                                 MyTestCase2,
-                                 ... list other tests ...
-                                )
+       support.run_unittest(MyTestCase1,
+                            MyTestCase2,
+                            ... list other tests ...
+                            )
 
    if __name__ == '__main__':
        test_main()
@@ -186,18 +186,19 @@ top-level directory where Python was built. On Windows, executing
 tests.
 
 
-:mod:`test.test_support` --- Utility functions for tests
-========================================================
+:mod:`test.support` --- Utility functions for tests
+===================================================
 
-.. module:: test.test_support
+.. module:: test.support
    :synopsis: Support for Python regression tests.
 
 .. note::
 
    The :mod:`test.test_support` module has been renamed to :mod:`test.support`
-   in Python 3.x.
+   in Python 3.x and 2.7.13.  The name ``test.test_support`` has been retained
+   as an alias in 2.7.
 
-The :mod:`test.test_support` module provides support for Python's regression
+The :mod:`test.support` module provides support for Python's regression
 tests.
 
 This module defines the following exceptions:
@@ -216,7 +217,7 @@ This module defines the following exceptions:
    network connection) is not available. Raised by the :func:`requires`
    function.
 
-The :mod:`test.test_support` module defines the following constants:
+The :mod:`test.support` module defines the following constants:
 
 
 .. data:: verbose
@@ -241,7 +242,7 @@ The :mod:`test.test_support` module defines the following constants:
    Set to a name that is safe to use as the name of a temporary file.  Any
    temporary file that is created should be closed and unlinked (removed).
 
-The :mod:`test.test_support` module defines the following functions:
+The :mod:`test.support` module defines the following functions:
 
 
 .. function:: forget(module_name)
@@ -284,7 +285,7 @@ The :mod:`test.test_support` module defines the following functions:
    following :func:`test_main` function::
 
       def test_main():
-          test_support.run_unittest(__name__)
+          support.run_unittest(__name__)
 
    This will run all tests defined in the named module.
 
@@ -433,7 +434,7 @@ The :mod:`test.test_support` module defines the following functions:
    .. versionadded:: 2.7
 
 
-The :mod:`test.test_support` module defines the following classes:
+The :mod:`test.support` module defines the following classes:
 
 .. class:: TransientResource(exc[, **kwargs])
 
index 6be47bd4ffe07d8487f0c00ee61a83ec9f26dcc1..99ec114eefe72eed9bffce2c3cbb6033f04cd2c2 100644 (file)
@@ -1,170 +1 @@
-# Common utility functions used by various script execution tests
-#  e.g. test_cmd_line, test_cmd_line_script and test_runpy
-
-import sys
-import os
-import re
-import os.path
-import tempfile
-import subprocess
-import py_compile
-import contextlib
-import shutil
-try:
-    import zipfile
-except ImportError:
-    # If Python is build without Unicode support, importing _io will
-    # fail, which, in turn, means that zipfile cannot be imported
-    # Most of this module can then still be used.
-    pass
-
-from test.test_support import strip_python_stderr
-
-# Executing the interpreter in a subprocess
-def _assert_python(expected_success, *args, **env_vars):
-    cmd_line = [sys.executable]
-    if not env_vars:
-        cmd_line.append('-E')
-    cmd_line.extend(args)
-    # Need to preserve the original environment, for in-place testing of
-    # shared library builds.
-    env = os.environ.copy()
-    env.update(env_vars)
-    p = subprocess.Popen(cmd_line, stdin=subprocess.PIPE,
-                         stdout=subprocess.PIPE, stderr=subprocess.PIPE,
-                         env=env)
-    try:
-        out, err = p.communicate()
-    finally:
-        subprocess._cleanup()
-        p.stdout.close()
-        p.stderr.close()
-    rc = p.returncode
-    err =  strip_python_stderr(err)
-    if (rc and expected_success) or (not rc and not expected_success):
-        raise AssertionError(
-            "Process return code is %d, "
-            "stderr follows:\n%s" % (rc, err.decode('ascii', 'ignore')))
-    return rc, out, err
-
-def assert_python_ok(*args, **env_vars):
-    """
-    Assert that running the interpreter with `args` and optional environment
-    variables `env_vars` is ok and return a (return code, stdout, stderr) tuple.
-    """
-    return _assert_python(True, *args, **env_vars)
-
-def assert_python_failure(*args, **env_vars):
-    """
-    Assert that running the interpreter with `args` and optional environment
-    variables `env_vars` fails and return a (return code, stdout, stderr) tuple.
-    """
-    return _assert_python(False, *args, **env_vars)
-
-def python_exit_code(*args):
-    cmd_line = [sys.executable, '-E']
-    cmd_line.extend(args)
-    with open(os.devnull, 'w') as devnull:
-        return subprocess.call(cmd_line, stdout=devnull,
-                                stderr=subprocess.STDOUT)
-
-def spawn_python(*args, **kwargs):
-    cmd_line = [sys.executable, '-E']
-    cmd_line.extend(args)
-    return subprocess.Popen(cmd_line, stdin=subprocess.PIPE,
-                            stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
-                            **kwargs)
-
-def kill_python(p):
-    p.stdin.close()
-    data = p.stdout.read()
-    p.stdout.close()
-    # try to cleanup the child so we don't appear to leak when running
-    # with regrtest -R.
-    p.wait()
-    subprocess._cleanup()
-    return data
-
-def run_python(*args, **kwargs):
-    if __debug__:
-        p = spawn_python(*args, **kwargs)
-    else:
-        p = spawn_python('-O', *args, **kwargs)
-    stdout_data = kill_python(p)
-    return p.wait(), stdout_data
-
-# Script creation utilities
-@contextlib.contextmanager
-def temp_dir():
-    dirname = tempfile.mkdtemp()
-    dirname = os.path.realpath(dirname)
-    try:
-        yield dirname
-    finally:
-        shutil.rmtree(dirname)
-
-def make_script(script_dir, script_basename, source):
-    script_filename = script_basename+os.extsep+'py'
-    script_name = os.path.join(script_dir, script_filename)
-    script_file = open(script_name, 'w')
-    script_file.write(source)
-    script_file.close()
-    return script_name
-
-def compile_script(script_name):
-    py_compile.compile(script_name, doraise=True)
-    if __debug__:
-        compiled_name = script_name + 'c'
-    else:
-        compiled_name = script_name + 'o'
-    return compiled_name
-
-def make_zip_script(zip_dir, zip_basename, script_name, name_in_zip=None):
-    zip_filename = zip_basename+os.extsep+'zip'
-    zip_name = os.path.join(zip_dir, zip_filename)
-    zip_file = zipfile.ZipFile(zip_name, 'w')
-    if name_in_zip is None:
-        name_in_zip = os.path.basename(script_name)
-    zip_file.write(script_name, name_in_zip)
-    zip_file.close()
-    #if test.test_support.verbose:
-    #    zip_file = zipfile.ZipFile(zip_name, 'r')
-    #    print 'Contents of %r:' % zip_name
-    #    zip_file.printdir()
-    #    zip_file.close()
-    return zip_name, os.path.join(zip_name, name_in_zip)
-
-def make_pkg(pkg_dir, init_source=''):
-    os.mkdir(pkg_dir)
-    make_script(pkg_dir, '__init__', init_source)
-
-def make_zip_pkg(zip_dir, zip_basename, pkg_name, script_basename,
-                 source, depth=1, compiled=False):
-    unlink = []
-    init_name = make_script(zip_dir, '__init__', '')
-    unlink.append(init_name)
-    init_basename = os.path.basename(init_name)
-    script_name = make_script(zip_dir, script_basename, source)
-    unlink.append(script_name)
-    if compiled:
-        init_name = compile_script(init_name)
-        script_name = compile_script(script_name)
-        unlink.extend((init_name, script_name))
-    pkg_names = [os.sep.join([pkg_name]*i) for i in range(1, depth+1)]
-    script_name_in_zip = os.path.join(pkg_names[-1], os.path.basename(script_name))
-    zip_filename = zip_basename+os.extsep+'zip'
-    zip_name = os.path.join(zip_dir, zip_filename)
-    zip_file = zipfile.ZipFile(zip_name, 'w')
-    for name in pkg_names:
-        init_name_in_zip = os.path.join(name, init_basename)
-        zip_file.write(init_name, init_name_in_zip)
-    zip_file.write(script_name, script_name_in_zip)
-    zip_file.close()
-    for name in unlink:
-        os.unlink(name)
-    #if test.test_support.verbose:
-    #    zip_file = zipfile.ZipFile(zip_name, 'r')
-    #    print 'Contents of %r:' % zip_name
-    #    zip_file.printdir()
-    #    zip_file.close()
-    return zip_name, os.path.join(zip_name, script_name_in_zip)
+from test.support.script_helper import *
diff --git a/Lib/test/support/__init__.py b/Lib/test/support/__init__.py
new file mode 100644 (file)
index 0000000..fa75ff8
--- /dev/null
@@ -0,0 +1,1763 @@
+"""Supporting definitions for the Python regression tests."""
+
+if __name__ != 'test.support':
+    raise ImportError('test.support must be imported from the test package')
+
+import contextlib
+import errno
+import functools
+import gc
+import socket
+import stat
+import sys
+import os
+import platform
+import shutil
+import warnings
+import unittest
+import importlib
+import UserDict
+import re
+import time
+import struct
+import sysconfig
+try:
+    import thread
+except ImportError:
+    thread = None
+
+__all__ = ["Error", "TestFailed", "ResourceDenied", "import_module",
+           "verbose", "use_resources", "max_memuse", "record_original_stdout",
+           "get_original_stdout", "unload", "unlink", "rmtree", "forget",
+           "is_resource_enabled", "requires", "requires_mac_ver",
+           "find_unused_port", "bind_port",
+           "fcmp", "have_unicode", "is_jython", "TESTFN", "HOST", "FUZZ",
+           "SAVEDCWD", "temp_cwd", "findfile", "sortdict", "check_syntax_error",
+           "open_urlresource", "check_warnings", "check_py3k_warnings",
+           "CleanImport", "EnvironmentVarGuard", "captured_output",
+           "captured_stdout", "TransientResource", "transient_internet",
+           "run_with_locale", "set_memlimit", "bigmemtest", "bigaddrspacetest",
+           "BasicTestRunner", "run_unittest", "run_doctest", "threading_setup",
+           "threading_cleanup", "reap_threads", "start_threads", "cpython_only",
+           "check_impl_detail", "get_attribute", "py3k_bytes",
+           "import_fresh_module", "threading_cleanup", "reap_children",
+           "strip_python_stderr", "IPV6_ENABLED", "run_with_tz"]
+
+class Error(Exception):
+    """Base class for regression test exceptions."""
+
+class TestFailed(Error):
+    """Test failed."""
+
+class ResourceDenied(unittest.SkipTest):
+    """Test skipped because it requested a disallowed resource.
+
+    This is raised when a test calls requires() for a resource that
+    has not been enabled.  It is used to distinguish between expected
+    and unexpected skips.
+    """
+
+@contextlib.contextmanager
+def _ignore_deprecated_imports(ignore=True):
+    """Context manager to suppress package and module deprecation
+    warnings when importing them.
+
+    If ignore is False, this context manager has no effect."""
+    if ignore:
+        with warnings.catch_warnings():
+            warnings.filterwarnings("ignore", ".+ (module|package)",
+                                    DeprecationWarning)
+            yield
+    else:
+        yield
+
+
+def import_module(name, deprecated=False):
+    """Import and return the module to be tested, raising SkipTest if
+    it is not available.
+
+    If deprecated is True, any module or package deprecation messages
+    will be suppressed."""
+    with _ignore_deprecated_imports(deprecated):
+        try:
+            return importlib.import_module(name)
+        except ImportError, msg:
+            raise unittest.SkipTest(str(msg))
+
+
+def _save_and_remove_module(name, orig_modules):
+    """Helper function to save and remove a module from sys.modules
+
+       Raise ImportError if the module can't be imported."""
+    # try to import the module and raise an error if it can't be imported
+    if name not in sys.modules:
+        __import__(name)
+        del sys.modules[name]
+    for modname in list(sys.modules):
+        if modname == name or modname.startswith(name + '.'):
+            orig_modules[modname] = sys.modules[modname]
+            del sys.modules[modname]
+
+def _save_and_block_module(name, orig_modules):
+    """Helper function to save and block a module in sys.modules
+
+       Return True if the module was in sys.modules, False otherwise."""
+    saved = True
+    try:
+        orig_modules[name] = sys.modules[name]
+    except KeyError:
+        saved = False
+    sys.modules[name] = None
+    return saved
+
+
+def import_fresh_module(name, fresh=(), blocked=(), deprecated=False):
+    """Imports and returns a module, deliberately bypassing the sys.modules cache
+    and importing a fresh copy of the module. Once the import is complete,
+    the sys.modules cache is restored to its original state.
+
+    Modules named in fresh are also imported anew if needed by the import.
+    If one of these modules can't be imported, None is returned.
+
+    Importing of modules named in blocked is prevented while the fresh import
+    takes place.
+
+    If deprecated is True, any module or package deprecation messages
+    will be suppressed."""
+    # NOTE: test_heapq, test_json, and test_warnings include extra sanity
+    # checks to make sure that this utility function is working as expected
+    with _ignore_deprecated_imports(deprecated):
+        # Keep track of modules saved for later restoration as well
+        # as those which just need a blocking entry removed
+        orig_modules = {}
+        names_to_remove = []
+        _save_and_remove_module(name, orig_modules)
+        try:
+            for fresh_name in fresh:
+                _save_and_remove_module(fresh_name, orig_modules)
+            for blocked_name in blocked:
+                if not _save_and_block_module(blocked_name, orig_modules):
+                    names_to_remove.append(blocked_name)
+            fresh_module = importlib.import_module(name)
+        except ImportError:
+            fresh_module = None
+        finally:
+            for orig_name, module in orig_modules.items():
+                sys.modules[orig_name] = module
+            for name_to_remove in names_to_remove:
+                del sys.modules[name_to_remove]
+        return fresh_module
+
+
+def get_attribute(obj, name):
+    """Get an attribute, raising SkipTest if AttributeError is raised."""
+    try:
+        attribute = getattr(obj, name)
+    except AttributeError:
+        raise unittest.SkipTest("module %s has no attribute %s" % (
+            obj.__name__, name))
+    else:
+        return attribute
+
+
+verbose = 1              # Flag set to 0 by regrtest.py
+use_resources = None     # Flag set to [] by regrtest.py
+max_memuse = 0           # Disable bigmem tests (they will still be run with
+                         # small sizes, to make sure they work.)
+real_max_memuse = 0
+
+# _original_stdout is meant to hold stdout at the time regrtest began.
+# This may be "the real" stdout, or IDLE's emulation of stdout, or whatever.
+# The point is to have some flavor of stdout the user can actually see.
+_original_stdout = None
+def record_original_stdout(stdout):
+    global _original_stdout
+    _original_stdout = stdout
+
+def get_original_stdout():
+    return _original_stdout or sys.stdout
+
+def unload(name):
+    try:
+        del sys.modules[name]
+    except KeyError:
+        pass
+
+def _force_run(path, func, *args):
+    try:
+        return func(*args)
+    except EnvironmentError as err:
+        if verbose >= 2:
+            print('%s: %s' % (err.__class__.__name__, err))
+            print('re-run %s%r' % (func.__name__, args))
+        os.chmod(path, stat.S_IRWXU)
+        return func(*args)
+
+if sys.platform.startswith("win"):
+    def _waitfor(func, pathname, waitall=False):
+        # Perform the operation
+        func(pathname)
+        # Now setup the wait loop
+        if waitall:
+            dirname = pathname
+        else:
+            dirname, name = os.path.split(pathname)
+            dirname = dirname or '.'
+        # Check for `pathname` to be removed from the filesystem.
+        # The exponential backoff of the timeout amounts to a total
+        # of ~1 second after which the deletion is probably an error
+        # anyway.
+        # Testing on an i7@4.3GHz shows that usually only 1 iteration is
+        # required when contention occurs.
+        timeout = 0.001
+        while timeout < 1.0:
+            # Note we are only testing for the existence of the file(s) in
+            # the contents of the directory regardless of any security or
+            # access rights.  If we have made it this far, we have sufficient
+            # permissions to do that much using Python's equivalent of the
+            # Windows API FindFirstFile.
+            # Other Windows APIs can fail or give incorrect results when
+            # dealing with files that are pending deletion.
+            L = os.listdir(dirname)
+            if not (L if waitall else name in L):
+                return
+            # Increase the timeout and try again
+            time.sleep(timeout)
+            timeout *= 2
+        warnings.warn('tests may fail, delete still pending for ' + pathname,
+                      RuntimeWarning, stacklevel=4)
+
+    def _unlink(filename):
+        _waitfor(os.unlink, filename)
+
+    def _rmdir(dirname):
+        _waitfor(os.rmdir, dirname)
+
+    def _rmtree(path):
+        def _rmtree_inner(path):
+            for name in _force_run(path, os.listdir, path):
+                fullname = os.path.join(path, name)
+                if os.path.isdir(fullname):
+                    _waitfor(_rmtree_inner, fullname, waitall=True)
+                    _force_run(fullname, os.rmdir, fullname)
+                else:
+                    _force_run(fullname, os.unlink, fullname)
+        _waitfor(_rmtree_inner, path, waitall=True)
+        _waitfor(lambda p: _force_run(p, os.rmdir, p), path)
+else:
+    _unlink = os.unlink
+    _rmdir = os.rmdir
+
+    def _rmtree(path):
+        try:
+            shutil.rmtree(path)
+            return
+        except EnvironmentError:
+            pass
+
+        def _rmtree_inner(path):
+            for name in _force_run(path, os.listdir, path):
+                fullname = os.path.join(path, name)
+                try:
+                    mode = os.lstat(fullname).st_mode
+                except EnvironmentError:
+                    mode = 0
+                if stat.S_ISDIR(mode):
+                    _rmtree_inner(fullname)
+                    _force_run(path, os.rmdir, fullname)
+                else:
+                    _force_run(path, os.unlink, fullname)
+        _rmtree_inner(path)
+        os.rmdir(path)
+
+def unlink(filename):
+    try:
+        _unlink(filename)
+    except OSError:
+        pass
+
+def rmdir(dirname):
+    try:
+        _rmdir(dirname)
+    except OSError as error:
+        # The directory need not exist.
+        if error.errno != errno.ENOENT:
+            raise
+
+def rmtree(path):
+    try:
+        _rmtree(path)
+    except OSError, e:
+        # Unix returns ENOENT, Windows returns ESRCH.
+        if e.errno not in (errno.ENOENT, errno.ESRCH):
+            raise
+
+def forget(modname):
+    '''"Forget" a module was ever imported by removing it from sys.modules and
+    deleting any .pyc and .pyo files.'''
+    unload(modname)
+    for dirname in sys.path:
+        unlink(os.path.join(dirname, modname + os.extsep + 'pyc'))
+        # Deleting the .pyo file cannot be within the 'try' for the .pyc since
+        # the chance exists that there is no .pyc (and thus the 'try' statement
+        # is exited) but there is a .pyo file.
+        unlink(os.path.join(dirname, modname + os.extsep + 'pyo'))
+
+# Check whether a gui is actually available
+def _is_gui_available():
+    if hasattr(_is_gui_available, 'result'):
+        return _is_gui_available.result
+    reason = None
+    if sys.platform.startswith('win'):
+        # if Python is running as a service (such as the buildbot service),
+        # gui interaction may be disallowed
+        import ctypes
+        import ctypes.wintypes
+        UOI_FLAGS = 1
+        WSF_VISIBLE = 0x0001
+        class USEROBJECTFLAGS(ctypes.Structure):
+            _fields_ = [("fInherit", ctypes.wintypes.BOOL),
+                        ("fReserved", ctypes.wintypes.BOOL),
+                        ("dwFlags", ctypes.wintypes.DWORD)]
+        dll = ctypes.windll.user32
+        h = dll.GetProcessWindowStation()
+        if not h:
+            raise ctypes.WinError()
+        uof = USEROBJECTFLAGS()
+        needed = ctypes.wintypes.DWORD()
+        res = dll.GetUserObjectInformationW(h,
+            UOI_FLAGS,
+            ctypes.byref(uof),
+            ctypes.sizeof(uof),
+            ctypes.byref(needed))
+        if not res:
+            raise ctypes.WinError()
+        if not bool(uof.dwFlags & WSF_VISIBLE):
+            reason = "gui not available (WSF_VISIBLE flag not set)"
+    elif sys.platform == 'darwin':
+        # The Aqua Tk implementations on OS X can abort the process if
+        # being called in an environment where a window server connection
+        # cannot be made, for instance when invoked by a buildbot or ssh
+        # process not running under the same user id as the current console
+        # user.  To avoid that, raise an exception if the window manager
+        # connection is not available.
+        from ctypes import cdll, c_int, pointer, Structure
+        from ctypes.util import find_library
+
+        app_services = cdll.LoadLibrary(find_library("ApplicationServices"))
+
+        if app_services.CGMainDisplayID() == 0:
+            reason = "gui tests cannot run without OS X window manager"
+        else:
+            class ProcessSerialNumber(Structure):
+                _fields_ = [("highLongOfPSN", c_int),
+                            ("lowLongOfPSN", c_int)]
+            psn = ProcessSerialNumber()
+            psn_p = pointer(psn)
+            if (  (app_services.GetCurrentProcess(psn_p) < 0) or
+                  (app_services.SetFrontProcess(psn_p) < 0) ):
+                reason = "cannot run without OS X gui process"
+
+    # check on every platform whether tkinter can actually do anything
+    if not reason:
+        try:
+            from Tkinter import Tk
+            root = Tk()
+            root.withdraw()
+            root.update()
+            root.destroy()
+        except Exception as e:
+            err_string = str(e)
+            if len(err_string) > 50:
+                err_string = err_string[:50] + ' [...]'
+            reason = 'Tk unavailable due to {}: {}'.format(type(e).__name__,
+                                                           err_string)
+
+    _is_gui_available.reason = reason
+    _is_gui_available.result = not reason
+
+    return _is_gui_available.result
+
+def is_resource_enabled(resource):
+    """Test whether a resource is enabled.
+
+    Known resources are set by regrtest.py.  If not running under regrtest.py,
+    all resources are assumed enabled unless use_resources has been set.
+    """
+    return use_resources is None or resource in use_resources
+
+def requires(resource, msg=None):
+    """Raise ResourceDenied if the specified resource is not available."""
+    if not is_resource_enabled(resource):
+        if msg is None:
+            msg = "Use of the `%s' resource not enabled" % resource
+        raise ResourceDenied(msg)
+    if resource == 'gui' and not _is_gui_available():
+        raise ResourceDenied(_is_gui_available.reason)
+
+def requires_mac_ver(*min_version):
+    """Decorator raising SkipTest if the OS is Mac OS X and the OS X
+    version if less than min_version.
+
+    For example, @requires_mac_ver(10, 5) raises SkipTest if the OS X version
+    is lesser than 10.5.
+    """
+    def decorator(func):
+        @functools.wraps(func)
+        def wrapper(*args, **kw):
+            if sys.platform == 'darwin':
+                version_txt = platform.mac_ver()[0]
+                try:
+                    version = tuple(map(int, version_txt.split('.')))
+                except ValueError:
+                    pass
+                else:
+                    if version < min_version:
+                        min_version_txt = '.'.join(map(str, min_version))
+                        raise unittest.SkipTest(
+                            "Mac OS X %s or higher required, not %s"
+                            % (min_version_txt, version_txt))
+            return func(*args, **kw)
+        wrapper.min_version = min_version
+        return wrapper
+    return decorator
+
+
+# Don't use "localhost", since resolving it uses the DNS under recent
+# Windows versions (see issue #18792).
+HOST = "127.0.0.1"
+HOSTv6 = "::1"
+
+
+def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM):
+    """Returns an unused port that should be suitable for binding.  This is
+    achieved by creating a temporary socket with the same family and type as
+    the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to
+    the specified host address (defaults to 0.0.0.0) with the port set to 0,
+    eliciting an unused ephemeral port from the OS.  The temporary socket is
+    then closed and deleted, and the ephemeral port is returned.
+
+    Either this method or bind_port() should be used for any tests where a
+    server socket needs to be bound to a particular port for the duration of
+    the test.  Which one to use depends on whether the calling code is creating
+    a python socket, or if an unused port needs to be provided in a constructor
+    or passed to an external program (i.e. the -accept argument to openssl's
+    s_server mode).  Always prefer bind_port() over find_unused_port() where
+    possible.  Hard coded ports should *NEVER* be used.  As soon as a server
+    socket is bound to a hard coded port, the ability to run multiple instances
+    of the test simultaneously on the same host is compromised, which makes the
+    test a ticking time bomb in a buildbot environment. On Unix buildbots, this
+    may simply manifest as a failed test, which can be recovered from without
+    intervention in most cases, but on Windows, the entire python process can
+    completely and utterly wedge, requiring someone to log in to the buildbot
+    and manually kill the affected process.
+
+    (This is easy to reproduce on Windows, unfortunately, and can be traced to
+    the SO_REUSEADDR socket option having different semantics on Windows versus
+    Unix/Linux.  On Unix, you can't have two AF_INET SOCK_STREAM sockets bind,
+    listen and then accept connections on identical host/ports.  An EADDRINUSE
+    socket.error will be raised at some point (depending on the platform and
+    the order bind and listen were called on each socket).
+
+    However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE
+    will ever be raised when attempting to bind two identical host/ports. When
+    accept() is called on each socket, the second caller's process will steal
+    the port from the first caller, leaving them both in an awkwardly wedged
+    state where they'll no longer respond to any signals or graceful kills, and
+    must be forcibly killed via OpenProcess()/TerminateProcess().
+
+    The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option
+    instead of SO_REUSEADDR, which effectively affords the same semantics as
+    SO_REUSEADDR on Unix.  Given the propensity of Unix developers in the Open
+    Source world compared to Windows ones, this is a common mistake.  A quick
+    look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when
+    openssl.exe is called with the 's_server' option, for example. See
+    http://bugs.python.org/issue2550 for more info.  The following site also
+    has a very thorough description about the implications of both REUSEADDR
+    and EXCLUSIVEADDRUSE on Windows:
+    http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx)
+
+    XXX: although this approach is a vast improvement on previous attempts to
+    elicit unused ports, it rests heavily on the assumption that the ephemeral
+    port returned to us by the OS won't immediately be dished back out to some
+    other process when we close and delete our temporary socket but before our
+    calling code has a chance to bind the returned port.  We can deal with this
+    issue if/when we come across it."""
+    tempsock = socket.socket(family, socktype)
+    port = bind_port(tempsock)
+    tempsock.close()
+    del tempsock
+    return port
+
+def bind_port(sock, host=HOST):
+    """Bind the socket to a free port and return the port number.  Relies on
+    ephemeral ports in order to ensure we are using an unbound port.  This is
+    important as many tests may be running simultaneously, especially in a
+    buildbot environment.  This method raises an exception if the sock.family
+    is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
+    or SO_REUSEPORT set on it.  Tests should *never* set these socket options
+    for TCP/IP sockets.  The only case for setting these options is testing
+    multicasting via multiple UDP sockets.
+
+    Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
+    on Windows), it will be set on the socket.  This will prevent anyone else
+    from bind()'ing to our host/port for the duration of the test.
+    """
+    if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
+        if hasattr(socket, 'SO_REUSEADDR'):
+            if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
+                raise TestFailed("tests should never set the SO_REUSEADDR "   \
+                                 "socket option on TCP/IP sockets!")
+        if hasattr(socket, 'SO_REUSEPORT'):
+            try:
+                if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
+                    raise TestFailed("tests should never set the SO_REUSEPORT "   \
+                                     "socket option on TCP/IP sockets!")
+            except EnvironmentError:
+                # Python's socket module was compiled using modern headers
+                # thus defining SO_REUSEPORT but this process is running
+                # under an older kernel that does not support SO_REUSEPORT.
+                pass
+        if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
+            sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
+
+    sock.bind((host, 0))
+    port = sock.getsockname()[1]
+    return port
+
+def _is_ipv6_enabled():
+    """Check whether IPv6 is enabled on this host."""
+    if socket.has_ipv6:
+        sock = None
+        try:
+            sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
+            sock.bind((HOSTv6, 0))
+            return True
+        except socket.error:
+            pass
+        finally:
+            if sock:
+                sock.close()
+    return False
+
+IPV6_ENABLED = _is_ipv6_enabled()
+
+def system_must_validate_cert(f):
+    """Skip the test on TLS certificate validation failures."""
+    @functools.wraps(f)
+    def dec(*args, **kwargs):
+        try:
+            f(*args, **kwargs)
+        except IOError as e:
+            if "CERTIFICATE_VERIFY_FAILED" in str(e):
+                raise unittest.SkipTest("system does not contain "
+                                        "necessary certificates")
+            raise
+    return dec
+
+FUZZ = 1e-6
+
+def fcmp(x, y): # fuzzy comparison function
+    if isinstance(x, float) or isinstance(y, float):
+        try:
+            fuzz = (abs(x) + abs(y)) * FUZZ
+            if abs(x-y) <= fuzz:
+                return 0
+        except:
+            pass
+    elif type(x) == type(y) and isinstance(x, (tuple, list)):
+        for i in range(min(len(x), len(y))):
+            outcome = fcmp(x[i], y[i])
+            if outcome != 0:
+                return outcome
+        return (len(x) > len(y)) - (len(x) < len(y))
+    return (x > y) - (x < y)
+
+
+# A constant likely larger than the underlying OS pipe buffer size, to
+# make writes blocking.
+# Windows limit seems to be around 512 B, and many Unix kernels have a
+# 64 KiB pipe buffer size or 16 * PAGE_SIZE: take a few megs to be sure.
+# (see issue #17835 for a discussion of this number).
+PIPE_MAX_SIZE = 4 * 1024 * 1024 + 1
+
+# A constant likely larger than the underlying OS socket buffer size, to make
+# writes blocking.
+# The socket buffer sizes can usually be tuned system-wide (e.g. through sysctl
+# on Linux), or on a per-socket basis (SO_SNDBUF/SO_RCVBUF). See issue #18643
+# for a discussion of this number).
+SOCK_MAX_SIZE = 16 * 1024 * 1024 + 1
+
+is_jython = sys.platform.startswith('java')
+
+try:
+    unicode
+    have_unicode = True
+except NameError:
+    have_unicode = False
+
+requires_unicode = unittest.skipUnless(have_unicode, 'no unicode support')
+
+def u(s):
+    return unicode(s, 'unicode-escape')
+
+# FS_NONASCII: non-ASCII Unicode character encodable by
+# sys.getfilesystemencoding(), or None if there is no such character.
+FS_NONASCII = None
+if have_unicode:
+    for character in (
+        # First try printable and common characters to have a readable filename.
+        # For each character, the encoding list are just example of encodings able
+        # to encode the character (the list is not exhaustive).
+
+        # U+00E6 (Latin Small Letter Ae): cp1252, iso-8859-1
+        unichr(0x00E6),
+        # U+0130 (Latin Capital Letter I With Dot Above): cp1254, iso8859_3
+        unichr(0x0130),
+        # U+0141 (Latin Capital Letter L With Stroke): cp1250, cp1257
+        unichr(0x0141),
+        # U+03C6 (Greek Small Letter Phi): cp1253
+        unichr(0x03C6),
+        # U+041A (Cyrillic Capital Letter Ka): cp1251
+        unichr(0x041A),
+        # U+05D0 (Hebrew Letter Alef): Encodable to cp424
+        unichr(0x05D0),
+        # U+060C (Arabic Comma): cp864, cp1006, iso8859_6, mac_arabic
+        unichr(0x060C),
+        # U+062A (Arabic Letter Teh): cp720
+        unichr(0x062A),
+        # U+0E01 (Thai Character Ko Kai): cp874
+        unichr(0x0E01),
+
+        # Then try more "special" characters. "special" because they may be
+        # interpreted or displayed differently depending on the exact locale
+        # encoding and the font.
+
+        # U+00A0 (No-Break Space)
+        unichr(0x00A0),
+        # U+20AC (Euro Sign)
+        unichr(0x20AC),
+    ):
+        try:
+            character.encode(sys.getfilesystemencoding())\
+                     .decode(sys.getfilesystemencoding())
+        except UnicodeError:
+            pass
+        else:
+            FS_NONASCII = character
+            break
+
+# Filename used for testing
+if os.name == 'java':
+    # Jython disallows @ in module names
+    TESTFN = '$test'
+elif os.name == 'riscos':
+    TESTFN = 'testfile'
+else:
+    TESTFN = '@test'
+    # Unicode name only used if TEST_FN_ENCODING exists for the platform.
+    if have_unicode:
+        # Assuming sys.getfilesystemencoding()!=sys.getdefaultencoding()
+        # TESTFN_UNICODE is a filename that can be encoded using the
+        # file system encoding, but *not* with the default (ascii) encoding
+        if isinstance('', unicode):
+            # python -U
+            # XXX perhaps unicode() should accept Unicode strings?
+            TESTFN_UNICODE = "@test-\xe0\xf2"
+        else:
+            # 2 latin characters.
+            TESTFN_UNICODE = unicode("@test-\xe0\xf2", "latin-1")
+        TESTFN_ENCODING = sys.getfilesystemencoding()
+        # TESTFN_UNENCODABLE is a filename that should *not* be
+        # able to be encoded by *either* the default or filesystem encoding.
+        # This test really only makes sense on Windows NT platforms
+        # which have special Unicode support in posixmodule.
+        if (not hasattr(sys, "getwindowsversion") or
+                sys.getwindowsversion()[3] < 2): #  0=win32s or 1=9x/ME
+            TESTFN_UNENCODABLE = None
+        else:
+            # Japanese characters (I think - from bug 846133)
+            TESTFN_UNENCODABLE = eval('u"@test-\u5171\u6709\u3055\u308c\u308b"')
+            try:
+                # XXX - Note - should be using TESTFN_ENCODING here - but for
+                # Windows, "mbcs" currently always operates as if in
+                # errors=ignore' mode - hence we get '?' characters rather than
+                # the exception.  'Latin1' operates as we expect - ie, fails.
+                # See [ 850997 ] mbcs encoding ignores errors
+                TESTFN_UNENCODABLE.encode("Latin1")
+            except UnicodeEncodeError:
+                pass
+            else:
+                print \
+                'WARNING: The filename %r CAN be encoded by the filesystem.  ' \
+                'Unicode filename tests may not be effective' \
+                % TESTFN_UNENCODABLE
+
+
+# Disambiguate TESTFN for parallel testing, while letting it remain a valid
+# module name.
+TESTFN = "{}_{}_tmp".format(TESTFN, os.getpid())
+
+# Save the initial cwd
+SAVEDCWD = os.getcwd()
+
+@contextlib.contextmanager
+def change_cwd(path, quiet=False):
+    """Return a context manager that changes the current working directory.
+
+    Arguments:
+
+      path: the directory to use as the temporary current working directory.
+
+      quiet: if False (the default), the context manager raises an exception
+        on error.  Otherwise, it issues only a warning and keeps the current
+        working directory the same.
+
+    """
+    saved_dir = os.getcwd()
+    try:
+        os.chdir(path)
+    except OSError:
+        if not quiet:
+            raise
+        warnings.warn('tests may fail, unable to change CWD to: ' + path,
+                      RuntimeWarning, stacklevel=3)
+    try:
+        yield os.getcwd()
+    finally:
+        os.chdir(saved_dir)
+
+
+@contextlib.contextmanager
+def temp_cwd(name='tempcwd', quiet=False):
+    """
+    Context manager that creates a temporary directory and set it as CWD.
+
+    The new CWD is created in the current directory and it's named *name*.
+    If *quiet* is False (default) and it's not possible to create or change
+    the CWD, an error is raised.  If it's True, only a warning is raised
+    and the original CWD is used.
+    """
+    if (have_unicode and isinstance(name, unicode) and
+        not os.path.supports_unicode_filenames):
+        try:
+            name = name.encode(sys.getfilesystemencoding() or 'ascii')
+        except UnicodeEncodeError:
+            if not quiet:
+                raise unittest.SkipTest('unable to encode the cwd name with '
+                                        'the filesystem encoding.')
+    saved_dir = os.getcwd()
+    is_temporary = False
+    try:
+        os.mkdir(name)
+        os.chdir(name)
+        is_temporary = True
+    except OSError:
+        if not quiet:
+            raise
+        warnings.warn('tests may fail, unable to change the CWD to ' + name,
+                      RuntimeWarning, stacklevel=3)
+    try:
+        yield os.getcwd()
+    finally:
+        os.chdir(saved_dir)
+        if is_temporary:
+            rmtree(name)
+
+# TEST_HOME_DIR refers to the top level directory of the "test" package
+# that contains Python's regression test suite
+TEST_SUPPORT_DIR = os.path.dirname(os.path.abspath(__file__))
+TEST_HOME_DIR = os.path.dirname(TEST_SUPPORT_DIR)
+
+# TEST_DATA_DIR is used as a target download location for remote resources
+TEST_DATA_DIR = os.path.join(TEST_HOME_DIR, "data")
+
+def findfile(file, subdir=None):
+    """Try to find a file on sys.path and the working directory.  If it is not
+    found the argument passed to the function is returned (this does not
+    necessarily signal failure; could still be the legitimate path)."""
+    if os.path.isabs(file):
+        return file
+    if subdir is not None:
+        file = os.path.join(subdir, file)
+    path = [TEST_HOME_DIR] + sys.path
+    for dn in path:
+        fn = os.path.join(dn, file)
+        if os.path.exists(fn): return fn
+    return file
+
+def sortdict(dict):
+    "Like repr(dict), but in sorted order."
+    items = dict.items()
+    items.sort()
+    reprpairs = ["%r: %r" % pair for pair in items]
+    withcommas = ", ".join(reprpairs)
+    return "{%s}" % withcommas
+
+def make_bad_fd():
+    """
+    Create an invalid file descriptor by opening and closing a file and return
+    its fd.
+    """
+    file = open(TESTFN, "wb")
+    try:
+        return file.fileno()
+    finally:
+        file.close()
+        unlink(TESTFN)
+
+def check_syntax_error(testcase, statement):
+    testcase.assertRaises(SyntaxError, compile, statement,
+                          '<test string>', 'exec')
+
+def open_urlresource(url, check=None):
+    import urlparse, urllib2
+
+    filename = urlparse.urlparse(url)[2].split('/')[-1] # '/': it's URL!
+
+    fn = os.path.join(TEST_DATA_DIR, filename)
+
+    def check_valid_file(fn):
+        f = open(fn)
+        if check is None:
+            return f
+        elif check(f):
+            f.seek(0)
+            return f
+        f.close()
+
+    if os.path.exists(fn):
+        f = check_valid_file(fn)
+        if f is not None:
+            return f
+        unlink(fn)
+
+    # Verify the requirement before downloading the file
+    requires('urlfetch')
+
+    print >> get_original_stdout(), '\tfetching %s ...' % url
+    f = urllib2.urlopen(url, timeout=15)
+    try:
+        with open(fn, "wb") as out:
+            s = f.read()
+            while s:
+                out.write(s)
+                s = f.read()
+    finally:
+        f.close()
+
+    f = check_valid_file(fn)
+    if f is not None:
+        return f
+    raise TestFailed('invalid resource "%s"' % fn)
+
+
+class WarningsRecorder(object):
+    """Convenience wrapper for the warnings list returned on
+       entry to the warnings.catch_warnings() context manager.
+    """
+    def __init__(self, warnings_list):
+        self._warnings = warnings_list
+        self._last = 0
+
+    def __getattr__(self, attr):
+        if len(self._warnings) > self._last:
+            return getattr(self._warnings[-1], attr)
+        elif attr in warnings.WarningMessage._WARNING_DETAILS:
+            return None
+        raise AttributeError("%r has no attribute %r" % (self, attr))
+
+    @property
+    def warnings(self):
+        return self._warnings[self._last:]
+
+    def reset(self):
+        self._last = len(self._warnings)
+
+
+def _filterwarnings(filters, quiet=False):
+    """Catch the warnings, then check if all the expected
+    warnings have been raised and re-raise unexpected warnings.
+    If 'quiet' is True, only re-raise the unexpected warnings.
+    """
+    # Clear the warning registry of the calling module
+    # in order to re-raise the warnings.
+    frame = sys._getframe(2)
+    registry = frame.f_globals.get('__warningregistry__')
+    if registry:
+        registry.clear()
+    with warnings.catch_warnings(record=True) as w:
+        # Set filter "always" to record all warnings.  Because
+        # test_warnings swap the module, we need to look up in
+        # the sys.modules dictionary.
+        sys.modules['warnings'].simplefilter("always")
+        yield WarningsRecorder(w)
+    # Filter the recorded warnings
+    reraise = [warning.message for warning in w]
+    missing = []
+    for msg, cat in filters:
+        seen = False
+        for exc in reraise[:]:
+            message = str(exc)
+            # Filter out the matching messages
+            if (re.match(msg, message, re.I) and
+                issubclass(exc.__class__, cat)):
+                seen = True
+                reraise.remove(exc)
+        if not seen and not quiet:
+            # This filter caught nothing
+            missing.append((msg, cat.__name__))
+    if reraise:
+        raise AssertionError("unhandled warning %r" % reraise[0])
+    if missing:
+        raise AssertionError("filter (%r, %s) did not catch any warning" %
+                             missing[0])
+
+
+@contextlib.contextmanager
+def check_warnings(*filters, **kwargs):
+    """Context manager to silence warnings.
+
+    Accept 2-tuples as positional arguments:
+        ("message regexp", WarningCategory)
+
+    Optional argument:
+     - if 'quiet' is True, it does not fail if a filter catches nothing
+        (default True without argument,
+         default False if some filters are defined)
+
+    Without argument, it defaults to:
+        check_warnings(("", Warning), quiet=True)
+    """
+    quiet = kwargs.get('quiet')
+    if not filters:
+        filters = (("", Warning),)
+        # Preserve backward compatibility
+        if quiet is None:
+            quiet = True
+    return _filterwarnings(filters, quiet)
+
+
+@contextlib.contextmanager
+def check_py3k_warnings(*filters, **kwargs):
+    """Context manager to silence py3k warnings.
+
+    Accept 2-tuples as positional arguments:
+        ("message regexp", WarningCategory)
+
+    Optional argument:
+     - if 'quiet' is True, it does not fail if a filter catches nothing
+        (default False)
+
+    Without argument, it defaults to:
+        check_py3k_warnings(("", DeprecationWarning), quiet=False)
+    """
+    if sys.py3kwarning:
+        if not filters:
+            filters = (("", DeprecationWarning),)
+    else:
+        # It should not raise any py3k warning
+        filters = ()
+    return _filterwarnings(filters, kwargs.get('quiet'))
+
+
+class CleanImport(object):
+    """Context manager to force import to return a new module reference.
+
+    This is useful for testing module-level behaviours, such as
+    the emission of a DeprecationWarning on import.
+
+    Use like this:
+
+        with CleanImport("foo"):
+            importlib.import_module("foo") # new reference
+    """
+
+    def __init__(self, *module_names):
+        self.original_modules = sys.modules.copy()
+        for module_name in module_names:
+            if module_name in sys.modules:
+                module = sys.modules[module_name]
+                # It is possible that module_name is just an alias for
+                # another module (e.g. stub for modules renamed in 3.x).
+                # In that case, we also need delete the real module to clear
+                # the import cache.
+                if module.__name__ != module_name:
+                    del sys.modules[module.__name__]
+                del sys.modules[module_name]
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, *ignore_exc):
+        sys.modules.update(self.original_modules)
+
+
+class EnvironmentVarGuard(UserDict.DictMixin):
+
+    """Class to help protect the environment variable properly.  Can be used as
+    a context manager."""
+
+    def __init__(self):
+        self._environ = os.environ
+        self._changed = {}
+
+    def __getitem__(self, envvar):
+        return self._environ[envvar]
+
+    def __setitem__(self, envvar, value):
+        # Remember the initial value on the first access
+        if envvar not in self._changed:
+            self._changed[envvar] = self._environ.get(envvar)
+        self._environ[envvar] = value
+
+    def __delitem__(self, envvar):
+        # Remember the initial value on the first access
+        if envvar not in self._changed:
+            self._changed[envvar] = self._environ.get(envvar)
+        if envvar in self._environ:
+            del self._environ[envvar]
+
+    def keys(self):
+        return self._environ.keys()
+
+    def set(self, envvar, value):
+        self[envvar] = value
+
+    def unset(self, envvar):
+        del self[envvar]
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, *ignore_exc):
+        for (k, v) in self._changed.items():
+            if v is None:
+                if k in self._environ:
+                    del self._environ[k]
+            else:
+                self._environ[k] = v
+        os.environ = self._environ
+
+
+class DirsOnSysPath(object):
+    """Context manager to temporarily add directories to sys.path.
+
+    This makes a copy of sys.path, appends any directories given
+    as positional arguments, then reverts sys.path to the copied
+    settings when the context ends.
+
+    Note that *all* sys.path modifications in the body of the
+    context manager, including replacement of the object,
+    will be reverted at the end of the block.
+    """
+
+    def __init__(self, *paths):
+        self.original_value = sys.path[:]
+        self.original_object = sys.path
+        sys.path.extend(paths)
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, *ignore_exc):
+        sys.path = self.original_object
+        sys.path[:] = self.original_value
+
+
+class TransientResource(object):
+
+    """Raise ResourceDenied if an exception is raised while the context manager
+    is in effect that matches the specified exception and attributes."""
+
+    def __init__(self, exc, **kwargs):
+        self.exc = exc
+        self.attrs = kwargs
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, type_=None, value=None, traceback=None):
+        """If type_ is a subclass of self.exc and value has attributes matching
+        self.attrs, raise ResourceDenied.  Otherwise let the exception
+        propagate (if any)."""
+        if type_ is not None and issubclass(self.exc, type_):
+            for attr, attr_value in self.attrs.iteritems():
+                if not hasattr(value, attr):
+                    break
+                if getattr(value, attr) != attr_value:
+                    break
+            else:
+                raise ResourceDenied("an optional resource is not available")
+
+
+@contextlib.contextmanager
+def transient_internet(resource_name, timeout=30.0, errnos=()):
+    """Return a context manager that raises ResourceDenied when various issues
+    with the Internet connection manifest themselves as exceptions."""
+    default_errnos = [
+        ('ECONNREFUSED', 111),
+        ('ECONNRESET', 104),
+        ('EHOSTUNREACH', 113),
+        ('ENETUNREACH', 101),
+        ('ETIMEDOUT', 110),
+    ]
+    default_gai_errnos = [
+        ('EAI_AGAIN', -3),
+        ('EAI_FAIL', -4),
+        ('EAI_NONAME', -2),
+        ('EAI_NODATA', -5),
+        # Windows defines EAI_NODATA as 11001 but idiotic getaddrinfo()
+        # implementation actually returns WSANO_DATA i.e. 11004.
+        ('WSANO_DATA', 11004),
+    ]
+
+    denied = ResourceDenied("Resource '%s' is not available" % resource_name)
+    captured_errnos = errnos
+    gai_errnos = []
+    if not captured_errnos:
+        captured_errnos = [getattr(errno, name, num)
+                           for (name, num) in default_errnos]
+        gai_errnos = [getattr(socket, name, num)
+                      for (name, num) in default_gai_errnos]
+
+    def filter_error(err):
+        n = getattr(err, 'errno', None)
+        if (isinstance(err, socket.timeout) or
+            (isinstance(err, socket.gaierror) and n in gai_errnos) or
+            n in captured_errnos):
+            if not verbose:
+                sys.stderr.write(denied.args[0] + "\n")
+            raise denied
+
+    old_timeout = socket.getdefaulttimeout()
+    try:
+        if timeout is not None:
+            socket.setdefaulttimeout(timeout)
+        yield
+    except IOError as err:
+        # urllib can wrap original socket errors multiple times (!), we must
+        # unwrap to get at the original error.
+        while True:
+            a = err.args
+            if len(a) >= 1 and isinstance(a[0], IOError):
+                err = a[0]
+            # The error can also be wrapped as args[1]:
+            #    except socket.error as msg:
+            #        raise IOError('socket error', msg).with_traceback(sys.exc_info()[2])
+            elif len(a) >= 2 and isinstance(a[1], IOError):
+                err = a[1]
+            else:
+                break
+        filter_error(err)
+        raise
+    # XXX should we catch generic exceptions and look for their
+    # __cause__ or __context__?
+    finally:
+        socket.setdefaulttimeout(old_timeout)
+
+
+@contextlib.contextmanager
+def captured_output(stream_name):
+    """Return a context manager used by captured_stdout and captured_stdin
+    that temporarily replaces the sys stream *stream_name* with a StringIO."""
+    import StringIO
+    orig_stdout = getattr(sys, stream_name)
+    setattr(sys, stream_name, StringIO.StringIO())
+    try:
+        yield getattr(sys, stream_name)
+    finally:
+        setattr(sys, stream_name, orig_stdout)
+
+def captured_stdout():
+    """Capture the output of sys.stdout:
+
+       with captured_stdout() as s:
+           print "hello"
+       self.assertEqual(s.getvalue(), "hello")
+    """
+    return captured_output("stdout")
+
+def captured_stderr():
+    return captured_output("stderr")
+
+def captured_stdin():
+    return captured_output("stdin")
+
+def gc_collect():
+    """Force as many objects as possible to be collected.
+
+    In non-CPython implementations of Python, this is needed because timely
+    deallocation is not guaranteed by the garbage collector.  (Even in CPython
+    this can be the case in case of reference cycles.)  This means that __del__
+    methods may be called later than expected and weakrefs may remain alive for
+    longer than expected.  This function tries its best to force all garbage
+    objects to disappear.
+    """
+    gc.collect()
+    if is_jython:
+        time.sleep(0.1)
+    gc.collect()
+    gc.collect()
+
+
+_header = '2P'
+if hasattr(sys, "gettotalrefcount"):
+    _header = '2P' + _header
+_vheader = _header + 'P'
+
+def calcobjsize(fmt):
+    return struct.calcsize(_header + fmt + '0P')
+
+def calcvobjsize(fmt):
+    return struct.calcsize(_vheader + fmt + '0P')
+
+
+_TPFLAGS_HAVE_GC = 1<<14
+_TPFLAGS_HEAPTYPE = 1<<9
+
+def check_sizeof(test, o, size):
+    import _testcapi
+    result = sys.getsizeof(o)
+    # add GC header size
+    if ((type(o) == type) and (o.__flags__ & _TPFLAGS_HEAPTYPE) or\
+        ((type(o) != type) and (type(o).__flags__ & _TPFLAGS_HAVE_GC))):
+        size += _testcapi.SIZEOF_PYGC_HEAD
+    msg = 'wrong size for %s: got %d, expected %d' \
+            % (type(o), result, size)
+    test.assertEqual(result, size, msg)
+
+
+#=======================================================================
+# Decorator for running a function in a different locale, correctly resetting
+# it afterwards.
+
+def run_with_locale(catstr, *locales):
+    def decorator(func):
+        def inner(*args, **kwds):
+            try:
+                import locale
+                category = getattr(locale, catstr)
+                orig_locale = locale.setlocale(category)
+            except AttributeError:
+                # if the test author gives us an invalid category string
+                raise
+            except:
+                # cannot retrieve original locale, so do nothing
+                locale = orig_locale = None
+            else:
+                for loc in locales:
+                    try:
+                        locale.setlocale(category, loc)
+                        break
+                    except:
+                        pass
+
+            # now run the function, resetting the locale on exceptions
+            try:
+                return func(*args, **kwds)
+            finally:
+                if locale and orig_locale:
+                    locale.setlocale(category, orig_locale)
+        inner.func_name = func.func_name
+        inner.__doc__ = func.__doc__
+        return inner
+    return decorator
+
+#=======================================================================
+# Decorator for running a function in a specific timezone, correctly
+# resetting it afterwards.
+
+def run_with_tz(tz):
+    def decorator(func):
+        def inner(*args, **kwds):
+            try:
+                tzset = time.tzset
+            except AttributeError:
+                raise unittest.SkipTest("tzset required")
+            if 'TZ' in os.environ:
+                orig_tz = os.environ['TZ']
+            else:
+                orig_tz = None
+            os.environ['TZ'] = tz
+            tzset()
+
+            # now run the function, resetting the tz on exceptions
+            try:
+                return func(*args, **kwds)
+            finally:
+                if orig_tz is None:
+                    del os.environ['TZ']
+                else:
+                    os.environ['TZ'] = orig_tz
+                time.tzset()
+
+        inner.__name__ = func.__name__
+        inner.__doc__ = func.__doc__
+        return inner
+    return decorator
+
+#=======================================================================
+# Big-memory-test support. Separate from 'resources' because memory use should be configurable.
+
+# Some handy shorthands. Note that these are used for byte-limits as well
+# as size-limits, in the various bigmem tests
+_1M = 1024*1024
+_1G = 1024 * _1M
+_2G = 2 * _1G
+_4G = 4 * _1G
+
+MAX_Py_ssize_t = sys.maxsize
+
+def set_memlimit(limit):
+    global max_memuse
+    global real_max_memuse
+    sizes = {
+        'k': 1024,
+        'm': _1M,
+        'g': _1G,
+        't': 1024*_1G,
+    }
+    m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit,
+                 re.IGNORECASE | re.VERBOSE)
+    if m is None:
+        raise ValueError('Invalid memory limit %r' % (limit,))
+    memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()])
+    real_max_memuse = memlimit
+    if memlimit > MAX_Py_ssize_t:
+        memlimit = MAX_Py_ssize_t
+    if memlimit < _2G - 1:
+        raise ValueError('Memory limit %r too low to be useful' % (limit,))
+    max_memuse = memlimit
+
+def bigmemtest(minsize, memuse, overhead=5*_1M):
+    """Decorator for bigmem tests.
+
+    'minsize' is the minimum useful size for the test (in arbitrary,
+    test-interpreted units.) 'memuse' is the number of 'bytes per size' for
+    the test, or a good estimate of it. 'overhead' specifies fixed overhead,
+    independent of the testsize, and defaults to 5Mb.
+
+    The decorator tries to guess a good value for 'size' and passes it to
+    the decorated test function. If minsize * memuse is more than the
+    allowed memory use (as defined by max_memuse), the test is skipped.
+    Otherwise, minsize is adjusted upward to use up to max_memuse.
+    """
+    def decorator(f):
+        def wrapper(self):
+            if not max_memuse:
+                # If max_memuse is 0 (the default),
+                # we still want to run the tests with size set to a few kb,
+                # to make sure they work. We still want to avoid using
+                # too much memory, though, but we do that noisily.
+                maxsize = 5147
+                self.assertFalse(maxsize * memuse + overhead > 20 * _1M)
+            else:
+                maxsize = int((max_memuse - overhead) / memuse)
+                if maxsize < minsize:
+                    # Really ought to print 'test skipped' or something
+                    if verbose:
+                        sys.stderr.write("Skipping %s because of memory "
+                                         "constraint\n" % (f.__name__,))
+                    return
+                # Try to keep some breathing room in memory use
+                maxsize = max(maxsize - 50 * _1M, minsize)
+            return f(self, maxsize)
+        wrapper.minsize = minsize
+        wrapper.memuse = memuse
+        wrapper.overhead = overhead
+        return wrapper
+    return decorator
+
+def precisionbigmemtest(size, memuse, overhead=5*_1M, dry_run=True):
+    def decorator(f):
+        def wrapper(self):
+            if not real_max_memuse:
+                maxsize = 5147
+            else:
+                maxsize = size
+
+            if ((real_max_memuse or not dry_run)
+                and real_max_memuse < maxsize * memuse):
+                if verbose:
+                    sys.stderr.write("Skipping %s because of memory "
+                                     "constraint\n" % (f.__name__,))
+                return
+
+            return f(self, maxsize)
+        wrapper.size = size
+        wrapper.memuse = memuse
+        wrapper.overhead = overhead
+        return wrapper
+    return decorator
+
+def bigaddrspacetest(f):
+    """Decorator for tests that fill the address space."""
+    def wrapper(self):
+        if max_memuse < MAX_Py_ssize_t:
+            if verbose:
+                sys.stderr.write("Skipping %s because of memory "
+                                 "constraint\n" % (f.__name__,))
+        else:
+            return f(self)
+    return wrapper
+
+#=======================================================================
+# unittest integration.
+
+class BasicTestRunner:
+    def run(self, test):
+        result = unittest.TestResult()
+        test(result)
+        return result
+
+def _id(obj):
+    return obj
+
+def requires_resource(resource):
+    if resource == 'gui' and not _is_gui_available():
+        return unittest.skip(_is_gui_available.reason)
+    if is_resource_enabled(resource):
+        return _id
+    else:
+        return unittest.skip("resource {0!r} is not enabled".format(resource))
+
+def cpython_only(test):
+    """
+    Decorator for tests only applicable on CPython.
+    """
+    return impl_detail(cpython=True)(test)
+
+def impl_detail(msg=None, **guards):
+    if check_impl_detail(**guards):
+        return _id
+    if msg is None:
+        guardnames, default = _parse_guards(guards)
+        if default:
+            msg = "implementation detail not available on {0}"
+        else:
+            msg = "implementation detail specific to {0}"
+        guardnames = sorted(guardnames.keys())
+        msg = msg.format(' or '.join(guardnames))
+    return unittest.skip(msg)
+
+def _parse_guards(guards):
+    # Returns a tuple ({platform_name: run_me}, default_value)
+    if not guards:
+        return ({'cpython': True}, False)
+    is_true = guards.values()[0]
+    assert guards.values() == [is_true] * len(guards)   # all True or all False
+    return (guards, not is_true)
+
+# Use the following check to guard CPython's implementation-specific tests --
+# or to run them only on the implementation(s) guarded by the arguments.
+def check_impl_detail(**guards):
+    """This function returns True or False depending on the host platform.
+       Examples:
+          if check_impl_detail():               # only on CPython (default)
+          if check_impl_detail(jython=True):    # only on Jython
+          if check_impl_detail(cpython=False):  # everywhere except on CPython
+    """
+    guards, default = _parse_guards(guards)
+    return guards.get(platform.python_implementation().lower(), default)
+
+
+
+def _run_suite(suite):
+    """Run tests from a unittest.TestSuite-derived class."""
+    if verbose:
+        runner = unittest.TextTestRunner(sys.stdout, verbosity=2)
+    else:
+        runner = BasicTestRunner()
+
+    result = runner.run(suite)
+    if not result.wasSuccessful():
+        if len(result.errors) == 1 and not result.failures:
+            err = result.errors[0][1]
+        elif len(result.failures) == 1 and not result.errors:
+            err = result.failures[0][1]
+        else:
+            err = "multiple errors occurred"
+            if not verbose:
+                err += "; run in verbose mode for details"
+        raise TestFailed(err)
+
+
+def run_unittest(*classes):
+    """Run tests from unittest.TestCase-derived classes."""
+    valid_types = (unittest.TestSuite, unittest.TestCase)
+    suite = unittest.TestSuite()
+    for cls in classes:
+        if isinstance(cls, str):
+            if cls in sys.modules:
+                suite.addTest(unittest.findTestCases(sys.modules[cls]))
+            else:
+                raise ValueError("str arguments must be keys in sys.modules")
+        elif isinstance(cls, valid_types):
+            suite.addTest(cls)
+        else:
+            suite.addTest(unittest.makeSuite(cls))
+    _run_suite(suite)
+
+#=======================================================================
+# Check for the presence of docstrings.
+
+HAVE_DOCSTRINGS = (check_impl_detail(cpython=False) or
+                   sys.platform == 'win32' or
+                   sysconfig.get_config_var('WITH_DOC_STRINGS'))
+
+requires_docstrings = unittest.skipUnless(HAVE_DOCSTRINGS,
+                                          "test requires docstrings")
+
+
+#=======================================================================
+# doctest driver.
+
+def run_doctest(module, verbosity=None):
+    """Run doctest on the given module.  Return (#failures, #tests).
+
+    If optional argument verbosity is not specified (or is None), pass
+    test.support's belief about verbosity on to doctest.  Else doctest's
+    usual behavior is used (it searches sys.argv for -v).
+    """
+
+    import doctest
+
+    if verbosity is None:
+        verbosity = verbose
+    else:
+        verbosity = None
+
+    # Direct doctest output (normally just errors) to real stdout; doctest
+    # output shouldn't be compared by regrtest.
+    save_stdout = sys.stdout
+    sys.stdout = get_original_stdout()
+    try:
+        f, t = doctest.testmod(module, verbose=verbosity)
+        if f:
+            raise TestFailed("%d of %d doctests failed" % (f, t))
+    finally:
+        sys.stdout = save_stdout
+    if verbose:
+        print 'doctest (%s) ... %d tests with zero failures' % (module.__name__, t)
+    return f, t
+
+#=======================================================================
+# Threading support to prevent reporting refleaks when running regrtest.py -R
+
+# NOTE: we use thread._count() rather than threading.enumerate() (or the
+# moral equivalent thereof) because a threading.Thread object is still alive
+# until its __bootstrap() method has returned, even after it has been
+# unregistered from the threading module.
+# thread._count(), on the other hand, only gets decremented *after* the
+# __bootstrap() method has returned, which gives us reliable reference counts
+# at the end of a test run.
+
+def threading_setup():
+    if thread:
+        return thread._count(),
+    else:
+        return 1,
+
+def threading_cleanup(nb_threads):
+    if not thread:
+        return
+
+    _MAX_COUNT = 10
+    for count in range(_MAX_COUNT):
+        n = thread._count()
+        if n == nb_threads:
+            break
+        time.sleep(0.1)
+    # XXX print a warning in case of failure?
+
+def reap_threads(func):
+    """Use this function when threads are being used.  This will
+    ensure that the threads are cleaned up even when the test fails.
+    If threading is unavailable this function does nothing.
+    """
+    if not thread:
+        return func
+
+    @functools.wraps(func)
+    def decorator(*args):
+        key = threading_setup()
+        try:
+            return func(*args)
+        finally:
+            threading_cleanup(*key)
+    return decorator
+
+def reap_children():
+    """Use this function at the end of test_main() whenever sub-processes
+    are started.  This will help ensure that no extra children (zombies)
+    stick around to hog resources and create problems when looking
+    for refleaks.
+    """
+
+    # Reap all our dead child processes so we don't leave zombies around.
+    # These hog resources and might be causing some of the buildbots to die.
+    if hasattr(os, 'waitpid'):
+        any_process = -1
+        while True:
+            try:
+                # This will raise an exception on Windows.  That's ok.
+                pid, status = os.waitpid(any_process, os.WNOHANG)
+                if pid == 0:
+                    break
+            except:
+                break
+
+@contextlib.contextmanager
+def start_threads(threads, unlock=None):
+    threads = list(threads)
+    started = []
+    try:
+        try:
+            for t in threads:
+                t.start()
+                started.append(t)
+        except:
+            if verbose:
+                print("Can't start %d threads, only %d threads started" %
+                      (len(threads), len(started)))
+            raise
+        yield
+    finally:
+        if unlock:
+            unlock()
+        endtime = starttime = time.time()
+        for timeout in range(1, 16):
+            endtime += 60
+            for t in started:
+                t.join(max(endtime - time.time(), 0.01))
+            started = [t for t in started if t.isAlive()]
+            if not started:
+                break
+            if verbose:
+                print('Unable to join %d threads during a period of '
+                      '%d minutes' % (len(started), timeout))
+    started = [t for t in started if t.isAlive()]
+    if started:
+        raise AssertionError('Unable to join %d threads' % len(started))
+
+@contextlib.contextmanager
+def swap_attr(obj, attr, new_val):
+    """Temporary swap out an attribute with a new object.
+
+    Usage:
+        with swap_attr(obj, "attr", 5):
+            ...
+
+        This will set obj.attr to 5 for the duration of the with: block,
+        restoring the old value at the end of the block. If `attr` doesn't
+        exist on `obj`, it will be created and then deleted at the end of the
+        block.
+
+        The old value (or None if it doesn't exist) will be assigned to the
+        target of the "as" clause, if there is one.
+    """
+    if hasattr(obj, attr):
+        real_val = getattr(obj, attr)
+        setattr(obj, attr, new_val)
+        try:
+            yield real_val
+        finally:
+            setattr(obj, attr, real_val)
+    else:
+        setattr(obj, attr, new_val)
+        try:
+            yield
+        finally:
+            if hasattr(obj, attr):
+                delattr(obj, attr)
+
+@contextlib.contextmanager
+def swap_item(obj, item, new_val):
+    """Temporary swap out an item with a new object.
+
+    Usage:
+        with swap_item(obj, "item", 5):
+            ...
+
+        This will set obj["item"] to 5 for the duration of the with: block,
+        restoring the old value at the end of the block. If `item` doesn't
+        exist on `obj`, it will be created and then deleted at the end of the
+        block.
+
+        The old value (or None if it doesn't exist) will be assigned to the
+        target of the "as" clause, if there is one.
+    """
+    if item in obj:
+        real_val = obj[item]
+        obj[item] = new_val
+        try:
+            yield real_val
+        finally:
+            obj[item] = real_val
+    else:
+        obj[item] = new_val
+        try:
+            yield
+        finally:
+            if item in obj:
+                del obj[item]
+
+def py3k_bytes(b):
+    """Emulate the py3k bytes() constructor.
+
+    NOTE: This is only a best effort function.
+    """
+    try:
+        # memoryview?
+        return b.tobytes()
+    except AttributeError:
+        try:
+            # iterable of ints?
+            return b"".join(chr(x) for x in b)
+        except TypeError:
+            return bytes(b)
+
+def args_from_interpreter_flags():
+    """Return a list of command-line arguments reproducing the current
+    settings in sys.flags."""
+    import subprocess
+    return subprocess._args_from_interpreter_flags()
+
+def strip_python_stderr(stderr):
+    """Strip the stderr of a Python process from potential debug output
+    emitted by the interpreter.
+
+    This will typically be run on the result of the communicate() method
+    of a subprocess.Popen object.
+    """
+    stderr = re.sub(br"\[\d+ refs\]\r?\n?$", b"", stderr).strip()
+    return stderr
+
+
+def check_free_after_iterating(test, iter, cls, args=()):
+    class A(cls):
+        def __del__(self):
+            done[0] = True
+            try:
+                next(it)
+            except StopIteration:
+                pass
+
+    done = [False]
+    it = iter(A(*args))
+    # Issue 26494: Shouldn't crash
+    test.assertRaises(StopIteration, next, it)
+    # The sequence should be deallocated just after the end of iterating
+    gc_collect()
+    test.assertTrue(done[0])
+
+@contextlib.contextmanager
+def disable_gc():
+    have_gc = gc.isenabled()
+    gc.disable()
+    try:
+        yield
+    finally:
+        if have_gc:
+            gc.enable()
diff --git a/Lib/test/support/script_helper.py b/Lib/test/support/script_helper.py
new file mode 100644 (file)
index 0000000..e06cdc3
--- /dev/null
@@ -0,0 +1,170 @@
+# Common utility functions used by various script execution tests
+#  e.g. test_cmd_line, test_cmd_line_script and test_runpy
+
+import sys
+import os
+import re
+import os.path
+import tempfile
+import subprocess
+import py_compile
+import contextlib
+import shutil
+try:
+    import zipfile
+except ImportError:
+    # If Python is build without Unicode support, importing _io will
+    # fail, which, in turn, means that zipfile cannot be imported
+    # Most of this module can then still be used.
+    pass
+
+from test.support import strip_python_stderr
+
+# Executing the interpreter in a subprocess
+def _assert_python(expected_success, *args, **env_vars):
+    cmd_line = [sys.executable]
+    if not env_vars:
+        cmd_line.append('-E')
+    cmd_line.extend(args)
+    # Need to preserve the original environment, for in-place testing of
+    # shared library builds.
+    env = os.environ.copy()
+    env.update(env_vars)
+    p = subprocess.Popen(cmd_line, stdin=subprocess.PIPE,
+                         stdout=subprocess.PIPE, stderr=subprocess.PIPE,
+                         env=env)
+    try:
+        out, err = p.communicate()
+    finally:
+        subprocess._cleanup()
+        p.stdout.close()
+        p.stderr.close()
+    rc = p.returncode
+    err =  strip_python_stderr(err)
+    if (rc and expected_success) or (not rc and not expected_success):
+        raise AssertionError(
+            "Process return code is %d, "
+            "stderr follows:\n%s" % (rc, err.decode('ascii', 'ignore')))
+    return rc, out, err
+
+def assert_python_ok(*args, **env_vars):
+    """
+    Assert that running the interpreter with `args` and optional environment
+    variables `env_vars` is ok and return a (return code, stdout, stderr) tuple.
+    """
+    return _assert_python(True, *args, **env_vars)
+
+def assert_python_failure(*args, **env_vars):
+    """
+    Assert that running the interpreter with `args` and optional environment
+    variables `env_vars` fails and return a (return code, stdout, stderr) tuple.
+    """
+    return _assert_python(False, *args, **env_vars)
+
+def python_exit_code(*args):
+    cmd_line = [sys.executable, '-E']
+    cmd_line.extend(args)
+    with open(os.devnull, 'w') as devnull:
+        return subprocess.call(cmd_line, stdout=devnull,
+                                stderr=subprocess.STDOUT)
+
+def spawn_python(*args, **kwargs):
+    cmd_line = [sys.executable, '-E']
+    cmd_line.extend(args)
+    return subprocess.Popen(cmd_line, stdin=subprocess.PIPE,
+                            stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
+                            **kwargs)
+
+def kill_python(p):
+    p.stdin.close()
+    data = p.stdout.read()
+    p.stdout.close()
+    # try to cleanup the child so we don't appear to leak when running
+    # with regrtest -R.
+    p.wait()
+    subprocess._cleanup()
+    return data
+
+def run_python(*args, **kwargs):
+    if __debug__:
+        p = spawn_python(*args, **kwargs)
+    else:
+        p = spawn_python('-O', *args, **kwargs)
+    stdout_data = kill_python(p)
+    return p.wait(), stdout_data
+
+# Script creation utilities
+@contextlib.contextmanager
+def temp_dir():
+    dirname = tempfile.mkdtemp()
+    dirname = os.path.realpath(dirname)
+    try:
+        yield dirname
+    finally:
+        shutil.rmtree(dirname)
+
+def make_script(script_dir, script_basename, source):
+    script_filename = script_basename+os.extsep+'py'
+    script_name = os.path.join(script_dir, script_filename)
+    script_file = open(script_name, 'w')
+    script_file.write(source)
+    script_file.close()
+    return script_name
+
+def compile_script(script_name):
+    py_compile.compile(script_name, doraise=True)
+    if __debug__:
+        compiled_name = script_name + 'c'
+    else:
+        compiled_name = script_name + 'o'
+    return compiled_name
+
+def make_zip_script(zip_dir, zip_basename, script_name, name_in_zip=None):
+    zip_filename = zip_basename+os.extsep+'zip'
+    zip_name = os.path.join(zip_dir, zip_filename)
+    zip_file = zipfile.ZipFile(zip_name, 'w')
+    if name_in_zip is None:
+        name_in_zip = os.path.basename(script_name)
+    zip_file.write(script_name, name_in_zip)
+    zip_file.close()
+    #if test.test_support.verbose:
+    #    zip_file = zipfile.ZipFile(zip_name, 'r')
+    #    print 'Contents of %r:' % zip_name
+    #    zip_file.printdir()
+    #    zip_file.close()
+    return zip_name, os.path.join(zip_name, name_in_zip)
+
+def make_pkg(pkg_dir, init_source=''):
+    os.mkdir(pkg_dir)
+    make_script(pkg_dir, '__init__', init_source)
+
+def make_zip_pkg(zip_dir, zip_basename, pkg_name, script_basename,
+                 source, depth=1, compiled=False):
+    unlink = []
+    init_name = make_script(zip_dir, '__init__', '')
+    unlink.append(init_name)
+    init_basename = os.path.basename(init_name)
+    script_name = make_script(zip_dir, script_basename, source)
+    unlink.append(script_name)
+    if compiled:
+        init_name = compile_script(init_name)
+        script_name = compile_script(script_name)
+        unlink.extend((init_name, script_name))
+    pkg_names = [os.sep.join([pkg_name]*i) for i in range(1, depth+1)]
+    script_name_in_zip = os.path.join(pkg_names[-1], os.path.basename(script_name))
+    zip_filename = zip_basename+os.extsep+'zip'
+    zip_name = os.path.join(zip_dir, zip_filename)
+    zip_file = zipfile.ZipFile(zip_name, 'w')
+    for name in pkg_names:
+        init_name_in_zip = os.path.join(name, init_basename)
+        zip_file.write(init_name, init_name_in_zip)
+    zip_file.write(script_name, script_name_in_zip)
+    zip_file.close()
+    for name in unlink:
+        os.unlink(name)
+    #if test.test_support.verbose:
+    #    zip_file = zipfile.ZipFile(zip_name, 'r')
+    #    print 'Contents of %r:' % zip_name
+    #    zip_file.printdir()
+    #    zip_file.close()
+    return zip_name, os.path.join(zip_name, script_name_in_zip)
index 459881170394f1840e77dd261243d96332687861..c98e49474282601f5a793c92630408347e5c05e3 100644 (file)
@@ -26,7 +26,7 @@ class CompilerTest(unittest.TestCase):
         # warning: if 'os' or 'test_support' are moved in some other dir,
         # they should be changed here.
         libdir = os.path.dirname(os.__file__)
-        testdir = os.path.dirname(test.test_support.__file__)
+        testdir = test.test_support.TEST_HOME_DIR
 
         for dir in [testdir]:
             for basename in "test_os.py",:
index 6f7ecd15d289310aaa4396bf5af488c69be21093..35a52db230521d5806fafed27da0b6041eef4005 100644 (file)
@@ -3,7 +3,7 @@
 import linecache
 import unittest
 import os.path
-from test import test_support as support
+from test import support
 
 
 FILENAME = linecache.__file__
@@ -11,7 +11,7 @@ INVALID_NAME = '!@$)(!@#_1'
 EMPTY = ''
 TESTS = 'inspect_fodder inspect_fodder2 mapping_tests'
 TESTS = TESTS.split()
-TEST_PATH = os.path.dirname(support.__file__)
+TEST_PATH = support.TEST_HOME_DIR
 MODULES = "linecache abc".split()
 MODULE_PATH = os.path.dirname(FILENAME)
 
index 976f301c0a9144d9b32e21b7b99d5ee7c4c823c8..3c894afcef5fcd0e9ad3115cfe3eaf9a1c0f4f5d 100644 (file)
-"""Supporting definitions for the Python regression tests."""
-
-if __name__ != 'test.test_support':
-    raise ImportError('test_support must be imported from the test package')
-
-import contextlib
-import errno
-import functools
-import gc
-import socket
-import stat
 import sys
-import os
-import platform
-import shutil
-import warnings
-import unittest
-import importlib
-import UserDict
-import re
-import time
-import struct
-import sysconfig
-try:
-    import thread
-except ImportError:
-    thread = None
-
-__all__ = ["Error", "TestFailed", "ResourceDenied", "import_module",
-           "verbose", "use_resources", "max_memuse", "record_original_stdout",
-           "get_original_stdout", "unload", "unlink", "rmtree", "forget",
-           "is_resource_enabled", "requires", "requires_mac_ver",
-           "find_unused_port", "bind_port",
-           "fcmp", "have_unicode", "is_jython", "TESTFN", "HOST", "FUZZ",
-           "SAVEDCWD", "temp_cwd", "findfile", "sortdict", "check_syntax_error",
-           "open_urlresource", "check_warnings", "check_py3k_warnings",
-           "CleanImport", "EnvironmentVarGuard", "captured_output",
-           "captured_stdout", "TransientResource", "transient_internet",
-           "run_with_locale", "set_memlimit", "bigmemtest", "bigaddrspacetest",
-           "BasicTestRunner", "run_unittest", "run_doctest", "threading_setup",
-           "threading_cleanup", "reap_threads", "start_threads", "cpython_only",
-           "check_impl_detail", "get_attribute", "py3k_bytes",
-           "import_fresh_module", "threading_cleanup", "reap_children",
-           "strip_python_stderr", "IPV6_ENABLED", "run_with_tz"]
-
-class Error(Exception):
-    """Base class for regression test exceptions."""
-
-class TestFailed(Error):
-    """Test failed."""
-
-class ResourceDenied(unittest.SkipTest):
-    """Test skipped because it requested a disallowed resource.
-
-    This is raised when a test calls requires() for a resource that
-    has not been enabled.  It is used to distinguish between expected
-    and unexpected skips.
-    """
-
-@contextlib.contextmanager
-def _ignore_deprecated_imports(ignore=True):
-    """Context manager to suppress package and module deprecation
-    warnings when importing them.
-
-    If ignore is False, this context manager has no effect."""
-    if ignore:
-        with warnings.catch_warnings():
-            warnings.filterwarnings("ignore", ".+ (module|package)",
-                                    DeprecationWarning)
-            yield
-    else:
-        yield
-
-
-def import_module(name, deprecated=False):
-    """Import and return the module to be tested, raising SkipTest if
-    it is not available.
-
-    If deprecated is True, any module or package deprecation messages
-    will be suppressed."""
-    with _ignore_deprecated_imports(deprecated):
-        try:
-            return importlib.import_module(name)
-        except ImportError, msg:
-            raise unittest.SkipTest(str(msg))
-
-
-def _save_and_remove_module(name, orig_modules):
-    """Helper function to save and remove a module from sys.modules
-
-       Raise ImportError if the module can't be imported."""
-    # try to import the module and raise an error if it can't be imported
-    if name not in sys.modules:
-        __import__(name)
-        del sys.modules[name]
-    for modname in list(sys.modules):
-        if modname == name or modname.startswith(name + '.'):
-            orig_modules[modname] = sys.modules[modname]
-            del sys.modules[modname]
-
-def _save_and_block_module(name, orig_modules):
-    """Helper function to save and block a module in sys.modules
-
-       Return True if the module was in sys.modules, False otherwise."""
-    saved = True
-    try:
-        orig_modules[name] = sys.modules[name]
-    except KeyError:
-        saved = False
-    sys.modules[name] = None
-    return saved
-
-
-def import_fresh_module(name, fresh=(), blocked=(), deprecated=False):
-    """Imports and returns a module, deliberately bypassing the sys.modules cache
-    and importing a fresh copy of the module. Once the import is complete,
-    the sys.modules cache is restored to its original state.
-
-    Modules named in fresh are also imported anew if needed by the import.
-    If one of these modules can't be imported, None is returned.
-
-    Importing of modules named in blocked is prevented while the fresh import
-    takes place.
-
-    If deprecated is True, any module or package deprecation messages
-    will be suppressed."""
-    # NOTE: test_heapq, test_json, and test_warnings include extra sanity
-    # checks to make sure that this utility function is working as expected
-    with _ignore_deprecated_imports(deprecated):
-        # Keep track of modules saved for later restoration as well
-        # as those which just need a blocking entry removed
-        orig_modules = {}
-        names_to_remove = []
-        _save_and_remove_module(name, orig_modules)
-        try:
-            for fresh_name in fresh:
-                _save_and_remove_module(fresh_name, orig_modules)
-            for blocked_name in blocked:
-                if not _save_and_block_module(blocked_name, orig_modules):
-                    names_to_remove.append(blocked_name)
-            fresh_module = importlib.import_module(name)
-        except ImportError:
-            fresh_module = None
-        finally:
-            for orig_name, module in orig_modules.items():
-                sys.modules[orig_name] = module
-            for name_to_remove in names_to_remove:
-                del sys.modules[name_to_remove]
-        return fresh_module
-
-
-def get_attribute(obj, name):
-    """Get an attribute, raising SkipTest if AttributeError is raised."""
-    try:
-        attribute = getattr(obj, name)
-    except AttributeError:
-        raise unittest.SkipTest("module %s has no attribute %s" % (
-            obj.__name__, name))
-    else:
-        return attribute
-
-
-verbose = 1              # Flag set to 0 by regrtest.py
-use_resources = None     # Flag set to [] by regrtest.py
-max_memuse = 0           # Disable bigmem tests (they will still be run with
-                         # small sizes, to make sure they work.)
-real_max_memuse = 0
-
-# _original_stdout is meant to hold stdout at the time regrtest began.
-# This may be "the real" stdout, or IDLE's emulation of stdout, or whatever.
-# The point is to have some flavor of stdout the user can actually see.
-_original_stdout = None
-def record_original_stdout(stdout):
-    global _original_stdout
-    _original_stdout = stdout
-
-def get_original_stdout():
-    return _original_stdout or sys.stdout
-
-def unload(name):
-    try:
-        del sys.modules[name]
-    except KeyError:
-        pass
-
-def _force_run(path, func, *args):
-    try:
-        return func(*args)
-    except EnvironmentError as err:
-        if verbose >= 2:
-            print('%s: %s' % (err.__class__.__name__, err))
-            print('re-run %s%r' % (func.__name__, args))
-        os.chmod(path, stat.S_IRWXU)
-        return func(*args)
-
-if sys.platform.startswith("win"):
-    def _waitfor(func, pathname, waitall=False):
-        # Perform the operation
-        func(pathname)
-        # Now setup the wait loop
-        if waitall:
-            dirname = pathname
-        else:
-            dirname, name = os.path.split(pathname)
-            dirname = dirname or '.'
-        # Check for `pathname` to be removed from the filesystem.
-        # The exponential backoff of the timeout amounts to a total
-        # of ~1 second after which the deletion is probably an error
-        # anyway.
-        # Testing on an i7@4.3GHz shows that usually only 1 iteration is
-        # required when contention occurs.
-        timeout = 0.001
-        while timeout < 1.0:
-            # Note we are only testing for the existence of the file(s) in
-            # the contents of the directory regardless of any security or
-            # access rights.  If we have made it this far, we have sufficient
-            # permissions to do that much using Python's equivalent of the
-            # Windows API FindFirstFile.
-            # Other Windows APIs can fail or give incorrect results when
-            # dealing with files that are pending deletion.
-            L = os.listdir(dirname)
-            if not (L if waitall else name in L):
-                return
-            # Increase the timeout and try again
-            time.sleep(timeout)
-            timeout *= 2
-        warnings.warn('tests may fail, delete still pending for ' + pathname,
-                      RuntimeWarning, stacklevel=4)
-
-    def _unlink(filename):
-        _waitfor(os.unlink, filename)
-
-    def _rmdir(dirname):
-        _waitfor(os.rmdir, dirname)
-
-    def _rmtree(path):
-        def _rmtree_inner(path):
-            for name in _force_run(path, os.listdir, path):
-                fullname = os.path.join(path, name)
-                if os.path.isdir(fullname):
-                    _waitfor(_rmtree_inner, fullname, waitall=True)
-                    _force_run(fullname, os.rmdir, fullname)
-                else:
-                    _force_run(fullname, os.unlink, fullname)
-        _waitfor(_rmtree_inner, path, waitall=True)
-        _waitfor(lambda p: _force_run(p, os.rmdir, p), path)
-else:
-    _unlink = os.unlink
-    _rmdir = os.rmdir
-
-    def _rmtree(path):
-        try:
-            shutil.rmtree(path)
-            return
-        except EnvironmentError:
-            pass
-
-        def _rmtree_inner(path):
-            for name in _force_run(path, os.listdir, path):
-                fullname = os.path.join(path, name)
-                try:
-                    mode = os.lstat(fullname).st_mode
-                except EnvironmentError:
-                    mode = 0
-                if stat.S_ISDIR(mode):
-                    _rmtree_inner(fullname)
-                    _force_run(path, os.rmdir, fullname)
-                else:
-                    _force_run(path, os.unlink, fullname)
-        _rmtree_inner(path)
-        os.rmdir(path)
-
-def unlink(filename):
-    try:
-        _unlink(filename)
-    except OSError:
-        pass
-
-def rmdir(dirname):
-    try:
-        _rmdir(dirname)
-    except OSError as error:
-        # The directory need not exist.
-        if error.errno != errno.ENOENT:
-            raise
-
-def rmtree(path):
-    try:
-        _rmtree(path)
-    except OSError, e:
-        # Unix returns ENOENT, Windows returns ESRCH.
-        if e.errno not in (errno.ENOENT, errno.ESRCH):
-            raise
-
-def forget(modname):
-    '''"Forget" a module was ever imported by removing it from sys.modules and
-    deleting any .pyc and .pyo files.'''
-    unload(modname)
-    for dirname in sys.path:
-        unlink(os.path.join(dirname, modname + os.extsep + 'pyc'))
-        # Deleting the .pyo file cannot be within the 'try' for the .pyc since
-        # the chance exists that there is no .pyc (and thus the 'try' statement
-        # is exited) but there is a .pyo file.
-        unlink(os.path.join(dirname, modname + os.extsep + 'pyo'))
-
-# Check whether a gui is actually available
-def _is_gui_available():
-    if hasattr(_is_gui_available, 'result'):
-        return _is_gui_available.result
-    reason = None
-    if sys.platform.startswith('win'):
-        # if Python is running as a service (such as the buildbot service),
-        # gui interaction may be disallowed
-        import ctypes
-        import ctypes.wintypes
-        UOI_FLAGS = 1
-        WSF_VISIBLE = 0x0001
-        class USEROBJECTFLAGS(ctypes.Structure):
-            _fields_ = [("fInherit", ctypes.wintypes.BOOL),
-                        ("fReserved", ctypes.wintypes.BOOL),
-                        ("dwFlags", ctypes.wintypes.DWORD)]
-        dll = ctypes.windll.user32
-        h = dll.GetProcessWindowStation()
-        if not h:
-            raise ctypes.WinError()
-        uof = USEROBJECTFLAGS()
-        needed = ctypes.wintypes.DWORD()
-        res = dll.GetUserObjectInformationW(h,
-            UOI_FLAGS,
-            ctypes.byref(uof),
-            ctypes.sizeof(uof),
-            ctypes.byref(needed))
-        if not res:
-            raise ctypes.WinError()
-        if not bool(uof.dwFlags & WSF_VISIBLE):
-            reason = "gui not available (WSF_VISIBLE flag not set)"
-    elif sys.platform == 'darwin':
-        # The Aqua Tk implementations on OS X can abort the process if
-        # being called in an environment where a window server connection
-        # cannot be made, for instance when invoked by a buildbot or ssh
-        # process not running under the same user id as the current console
-        # user.  To avoid that, raise an exception if the window manager
-        # connection is not available.
-        from ctypes import cdll, c_int, pointer, Structure
-        from ctypes.util import find_library
-
-        app_services = cdll.LoadLibrary(find_library("ApplicationServices"))
-
-        if app_services.CGMainDisplayID() == 0:
-            reason = "gui tests cannot run without OS X window manager"
-        else:
-            class ProcessSerialNumber(Structure):
-                _fields_ = [("highLongOfPSN", c_int),
-                            ("lowLongOfPSN", c_int)]
-            psn = ProcessSerialNumber()
-            psn_p = pointer(psn)
-            if (  (app_services.GetCurrentProcess(psn_p) < 0) or
-                  (app_services.SetFrontProcess(psn_p) < 0) ):
-                reason = "cannot run without OS X gui process"
-
-    # check on every platform whether tkinter can actually do anything
-    if not reason:
-        try:
-            from Tkinter import Tk
-            root = Tk()
-            root.withdraw()
-            root.update()
-            root.destroy()
-        except Exception as e:
-            err_string = str(e)
-            if len(err_string) > 50:
-                err_string = err_string[:50] + ' [...]'
-            reason = 'Tk unavailable due to {}: {}'.format(type(e).__name__,
-                                                           err_string)
-
-    _is_gui_available.reason = reason
-    _is_gui_available.result = not reason
-
-    return _is_gui_available.result
-
-def is_resource_enabled(resource):
-    """Test whether a resource is enabled.
-
-    Known resources are set by regrtest.py.  If not running under regrtest.py,
-    all resources are assumed enabled unless use_resources has been set.
-    """
-    return use_resources is None or resource in use_resources
-
-def requires(resource, msg=None):
-    """Raise ResourceDenied if the specified resource is not available."""
-    if not is_resource_enabled(resource):
-        if msg is None:
-            msg = "Use of the `%s' resource not enabled" % resource
-        raise ResourceDenied(msg)
-    if resource == 'gui' and not _is_gui_available():
-        raise ResourceDenied(_is_gui_available.reason)
-
-def requires_mac_ver(*min_version):
-    """Decorator raising SkipTest if the OS is Mac OS X and the OS X
-    version if less than min_version.
-
-    For example, @requires_mac_ver(10, 5) raises SkipTest if the OS X version
-    is lesser than 10.5.
-    """
-    def decorator(func):
-        @functools.wraps(func)
-        def wrapper(*args, **kw):
-            if sys.platform == 'darwin':
-                version_txt = platform.mac_ver()[0]
-                try:
-                    version = tuple(map(int, version_txt.split('.')))
-                except ValueError:
-                    pass
-                else:
-                    if version < min_version:
-                        min_version_txt = '.'.join(map(str, min_version))
-                        raise unittest.SkipTest(
-                            "Mac OS X %s or higher required, not %s"
-                            % (min_version_txt, version_txt))
-            return func(*args, **kw)
-        wrapper.min_version = min_version
-        return wrapper
-    return decorator
-
-
-# Don't use "localhost", since resolving it uses the DNS under recent
-# Windows versions (see issue #18792).
-HOST = "127.0.0.1"
-HOSTv6 = "::1"
-
-
-def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM):
-    """Returns an unused port that should be suitable for binding.  This is
-    achieved by creating a temporary socket with the same family and type as
-    the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to
-    the specified host address (defaults to 0.0.0.0) with the port set to 0,
-    eliciting an unused ephemeral port from the OS.  The temporary socket is
-    then closed and deleted, and the ephemeral port is returned.
-
-    Either this method or bind_port() should be used for any tests where a
-    server socket needs to be bound to a particular port for the duration of
-    the test.  Which one to use depends on whether the calling code is creating
-    a python socket, or if an unused port needs to be provided in a constructor
-    or passed to an external program (i.e. the -accept argument to openssl's
-    s_server mode).  Always prefer bind_port() over find_unused_port() where
-    possible.  Hard coded ports should *NEVER* be used.  As soon as a server
-    socket is bound to a hard coded port, the ability to run multiple instances
-    of the test simultaneously on the same host is compromised, which makes the
-    test a ticking time bomb in a buildbot environment. On Unix buildbots, this
-    may simply manifest as a failed test, which can be recovered from without
-    intervention in most cases, but on Windows, the entire python process can
-    completely and utterly wedge, requiring someone to log in to the buildbot
-    and manually kill the affected process.
-
-    (This is easy to reproduce on Windows, unfortunately, and can be traced to
-    the SO_REUSEADDR socket option having different semantics on Windows versus
-    Unix/Linux.  On Unix, you can't have two AF_INET SOCK_STREAM sockets bind,
-    listen and then accept connections on identical host/ports.  An EADDRINUSE
-    socket.error will be raised at some point (depending on the platform and
-    the order bind and listen were called on each socket).
-
-    However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE
-    will ever be raised when attempting to bind two identical host/ports. When
-    accept() is called on each socket, the second caller's process will steal
-    the port from the first caller, leaving them both in an awkwardly wedged
-    state where they'll no longer respond to any signals or graceful kills, and
-    must be forcibly killed via OpenProcess()/TerminateProcess().
-
-    The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option
-    instead of SO_REUSEADDR, which effectively affords the same semantics as
-    SO_REUSEADDR on Unix.  Given the propensity of Unix developers in the Open
-    Source world compared to Windows ones, this is a common mistake.  A quick
-    look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when
-    openssl.exe is called with the 's_server' option, for example. See
-    http://bugs.python.org/issue2550 for more info.  The following site also
-    has a very thorough description about the implications of both REUSEADDR
-    and EXCLUSIVEADDRUSE on Windows:
-    http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx)
-
-    XXX: although this approach is a vast improvement on previous attempts to
-    elicit unused ports, it rests heavily on the assumption that the ephemeral
-    port returned to us by the OS won't immediately be dished back out to some
-    other process when we close and delete our temporary socket but before our
-    calling code has a chance to bind the returned port.  We can deal with this
-    issue if/when we come across it."""
-    tempsock = socket.socket(family, socktype)
-    port = bind_port(tempsock)
-    tempsock.close()
-    del tempsock
-    return port
-
-def bind_port(sock, host=HOST):
-    """Bind the socket to a free port and return the port number.  Relies on
-    ephemeral ports in order to ensure we are using an unbound port.  This is
-    important as many tests may be running simultaneously, especially in a
-    buildbot environment.  This method raises an exception if the sock.family
-    is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
-    or SO_REUSEPORT set on it.  Tests should *never* set these socket options
-    for TCP/IP sockets.  The only case for setting these options is testing
-    multicasting via multiple UDP sockets.
-
-    Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
-    on Windows), it will be set on the socket.  This will prevent anyone else
-    from bind()'ing to our host/port for the duration of the test.
-    """
-    if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
-        if hasattr(socket, 'SO_REUSEADDR'):
-            if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
-                raise TestFailed("tests should never set the SO_REUSEADDR "   \
-                                 "socket option on TCP/IP sockets!")
-        if hasattr(socket, 'SO_REUSEPORT'):
-            try:
-                if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
-                    raise TestFailed("tests should never set the SO_REUSEPORT "   \
-                                     "socket option on TCP/IP sockets!")
-            except EnvironmentError:
-                # Python's socket module was compiled using modern headers
-                # thus defining SO_REUSEPORT but this process is running
-                # under an older kernel that does not support SO_REUSEPORT.
-                pass
-        if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
-            sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
-
-    sock.bind((host, 0))
-    port = sock.getsockname()[1]
-    return port
-
-def _is_ipv6_enabled():
-    """Check whether IPv6 is enabled on this host."""
-    if socket.has_ipv6:
-        sock = None
-        try:
-            sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
-            sock.bind((HOSTv6, 0))
-            return True
-        except socket.error:
-            pass
-        finally:
-            if sock:
-                sock.close()
-    return False
-
-IPV6_ENABLED = _is_ipv6_enabled()
-
-def system_must_validate_cert(f):
-    """Skip the test on TLS certificate validation failures."""
-    @functools.wraps(f)
-    def dec(*args, **kwargs):
-        try:
-            f(*args, **kwargs)
-        except IOError as e:
-            if "CERTIFICATE_VERIFY_FAILED" in str(e):
-                raise unittest.SkipTest("system does not contain "
-                                        "necessary certificates")
-            raise
-    return dec
-
-FUZZ = 1e-6
-
-def fcmp(x, y): # fuzzy comparison function
-    if isinstance(x, float) or isinstance(y, float):
-        try:
-            fuzz = (abs(x) + abs(y)) * FUZZ
-            if abs(x-y) <= fuzz:
-                return 0
-        except:
-            pass
-    elif type(x) == type(y) and isinstance(x, (tuple, list)):
-        for i in range(min(len(x), len(y))):
-            outcome = fcmp(x[i], y[i])
-            if outcome != 0:
-                return outcome
-        return (len(x) > len(y)) - (len(x) < len(y))
-    return (x > y) - (x < y)
-
-
-# A constant likely larger than the underlying OS pipe buffer size, to
-# make writes blocking.
-# Windows limit seems to be around 512 B, and many Unix kernels have a
-# 64 KiB pipe buffer size or 16 * PAGE_SIZE: take a few megs to be sure.
-# (see issue #17835 for a discussion of this number).
-PIPE_MAX_SIZE = 4 * 1024 * 1024 + 1
-
-# A constant likely larger than the underlying OS socket buffer size, to make
-# writes blocking.
-# The socket buffer sizes can usually be tuned system-wide (e.g. through sysctl
-# on Linux), or on a per-socket basis (SO_SNDBUF/SO_RCVBUF). See issue #18643
-# for a discussion of this number).
-SOCK_MAX_SIZE = 16 * 1024 * 1024 + 1
-
-is_jython = sys.platform.startswith('java')
-
-try:
-    unicode
-    have_unicode = True
-except NameError:
-    have_unicode = False
-
-requires_unicode = unittest.skipUnless(have_unicode, 'no unicode support')
-
-def u(s):
-    return unicode(s, 'unicode-escape')
-
-# FS_NONASCII: non-ASCII Unicode character encodable by
-# sys.getfilesystemencoding(), or None if there is no such character.
-FS_NONASCII = None
-if have_unicode:
-    for character in (
-        # First try printable and common characters to have a readable filename.
-        # For each character, the encoding list are just example of encodings able
-        # to encode the character (the list is not exhaustive).
-
-        # U+00E6 (Latin Small Letter Ae): cp1252, iso-8859-1
-        unichr(0x00E6),
-        # U+0130 (Latin Capital Letter I With Dot Above): cp1254, iso8859_3
-        unichr(0x0130),
-        # U+0141 (Latin Capital Letter L With Stroke): cp1250, cp1257
-        unichr(0x0141),
-        # U+03C6 (Greek Small Letter Phi): cp1253
-        unichr(0x03C6),
-        # U+041A (Cyrillic Capital Letter Ka): cp1251
-        unichr(0x041A),
-        # U+05D0 (Hebrew Letter Alef): Encodable to cp424
-        unichr(0x05D0),
-        # U+060C (Arabic Comma): cp864, cp1006, iso8859_6, mac_arabic
-        unichr(0x060C),
-        # U+062A (Arabic Letter Teh): cp720
-        unichr(0x062A),
-        # U+0E01 (Thai Character Ko Kai): cp874
-        unichr(0x0E01),
-
-        # Then try more "special" characters. "special" because they may be
-        # interpreted or displayed differently depending on the exact locale
-        # encoding and the font.
-
-        # U+00A0 (No-Break Space)
-        unichr(0x00A0),
-        # U+20AC (Euro Sign)
-        unichr(0x20AC),
-    ):
-        try:
-            character.encode(sys.getfilesystemencoding())\
-                     .decode(sys.getfilesystemencoding())
-        except UnicodeError:
-            pass
-        else:
-            FS_NONASCII = character
-            break
-
-# Filename used for testing
-if os.name == 'java':
-    # Jython disallows @ in module names
-    TESTFN = '$test'
-elif os.name == 'riscos':
-    TESTFN = 'testfile'
-else:
-    TESTFN = '@test'
-    # Unicode name only used if TEST_FN_ENCODING exists for the platform.
-    if have_unicode:
-        # Assuming sys.getfilesystemencoding()!=sys.getdefaultencoding()
-        # TESTFN_UNICODE is a filename that can be encoded using the
-        # file system encoding, but *not* with the default (ascii) encoding
-        if isinstance('', unicode):
-            # python -U
-            # XXX perhaps unicode() should accept Unicode strings?
-            TESTFN_UNICODE = "@test-\xe0\xf2"
-        else:
-            # 2 latin characters.
-            TESTFN_UNICODE = unicode("@test-\xe0\xf2", "latin-1")
-        TESTFN_ENCODING = sys.getfilesystemencoding()
-        # TESTFN_UNENCODABLE is a filename that should *not* be
-        # able to be encoded by *either* the default or filesystem encoding.
-        # This test really only makes sense on Windows NT platforms
-        # which have special Unicode support in posixmodule.
-        if (not hasattr(sys, "getwindowsversion") or
-                sys.getwindowsversion()[3] < 2): #  0=win32s or 1=9x/ME
-            TESTFN_UNENCODABLE = None
-        else:
-            # Japanese characters (I think - from bug 846133)
-            TESTFN_UNENCODABLE = eval('u"@test-\u5171\u6709\u3055\u308c\u308b"')
-            try:
-                # XXX - Note - should be using TESTFN_ENCODING here - but for
-                # Windows, "mbcs" currently always operates as if in
-                # errors=ignore' mode - hence we get '?' characters rather than
-                # the exception.  'Latin1' operates as we expect - ie, fails.
-                # See [ 850997 ] mbcs encoding ignores errors
-                TESTFN_UNENCODABLE.encode("Latin1")
-            except UnicodeEncodeError:
-                pass
-            else:
-                print \
-                'WARNING: The filename %r CAN be encoded by the filesystem.  ' \
-                'Unicode filename tests may not be effective' \
-                % TESTFN_UNENCODABLE
-
-
-# Disambiguate TESTFN for parallel testing, while letting it remain a valid
-# module name.
-TESTFN = "{}_{}_tmp".format(TESTFN, os.getpid())
-
-# Save the initial cwd
-SAVEDCWD = os.getcwd()
-
-@contextlib.contextmanager
-def change_cwd(path, quiet=False):
-    """Return a context manager that changes the current working directory.
-
-    Arguments:
-
-      path: the directory to use as the temporary current working directory.
-
-      quiet: if False (the default), the context manager raises an exception
-        on error.  Otherwise, it issues only a warning and keeps the current
-        working directory the same.
-
-    """
-    saved_dir = os.getcwd()
-    try:
-        os.chdir(path)
-    except OSError:
-        if not quiet:
-            raise
-        warnings.warn('tests may fail, unable to change CWD to: ' + path,
-                      RuntimeWarning, stacklevel=3)
-    try:
-        yield os.getcwd()
-    finally:
-        os.chdir(saved_dir)
-
-
-@contextlib.contextmanager
-def temp_cwd(name='tempcwd', quiet=False):
-    """
-    Context manager that creates a temporary directory and set it as CWD.
-
-    The new CWD is created in the current directory and it's named *name*.
-    If *quiet* is False (default) and it's not possible to create or change
-    the CWD, an error is raised.  If it's True, only a warning is raised
-    and the original CWD is used.
-    """
-    if (have_unicode and isinstance(name, unicode) and
-        not os.path.supports_unicode_filenames):
-        try:
-            name = name.encode(sys.getfilesystemencoding() or 'ascii')
-        except UnicodeEncodeError:
-            if not quiet:
-                raise unittest.SkipTest('unable to encode the cwd name with '
-                                        'the filesystem encoding.')
-    saved_dir = os.getcwd()
-    is_temporary = False
-    try:
-        os.mkdir(name)
-        os.chdir(name)
-        is_temporary = True
-    except OSError:
-        if not quiet:
-            raise
-        warnings.warn('tests may fail, unable to change the CWD to ' + name,
-                      RuntimeWarning, stacklevel=3)
-    try:
-        yield os.getcwd()
-    finally:
-        os.chdir(saved_dir)
-        if is_temporary:
-            rmtree(name)
-
-
-def findfile(file, here=__file__, subdir=None):
-    """Try to find a file on sys.path and the working directory.  If it is not
-    found the argument passed to the function is returned (this does not
-    necessarily signal failure; could still be the legitimate path)."""
-    if os.path.isabs(file):
-        return file
-    if subdir is not None:
-        file = os.path.join(subdir, file)
-    path = sys.path
-    path = [os.path.dirname(here)] + path
-    for dn in path:
-        fn = os.path.join(dn, file)
-        if os.path.exists(fn): return fn
-    return file
-
-def sortdict(dict):
-    "Like repr(dict), but in sorted order."
-    items = dict.items()
-    items.sort()
-    reprpairs = ["%r: %r" % pair for pair in items]
-    withcommas = ", ".join(reprpairs)
-    return "{%s}" % withcommas
-
-def make_bad_fd():
-    """
-    Create an invalid file descriptor by opening and closing a file and return
-    its fd.
-    """
-    file = open(TESTFN, "wb")
-    try:
-        return file.fileno()
-    finally:
-        file.close()
-        unlink(TESTFN)
-
-def check_syntax_error(testcase, statement):
-    testcase.assertRaises(SyntaxError, compile, statement,
-                          '<test string>', 'exec')
-
-def open_urlresource(url, check=None):
-    import urlparse, urllib2
-
-    filename = urlparse.urlparse(url)[2].split('/')[-1] # '/': it's URL!
-
-    fn = os.path.join(os.path.dirname(__file__), "data", filename)
-
-    def check_valid_file(fn):
-        f = open(fn)
-        if check is None:
-            return f
-        elif check(f):
-            f.seek(0)
-            return f
-        f.close()
-
-    if os.path.exists(fn):
-        f = check_valid_file(fn)
-        if f is not None:
-            return f
-        unlink(fn)
-
-    # Verify the requirement before downloading the file
-    requires('urlfetch')
-
-    print >> get_original_stdout(), '\tfetching %s ...' % url
-    f = urllib2.urlopen(url, timeout=15)
-    try:
-        with open(fn, "wb") as out:
-            s = f.read()
-            while s:
-                out.write(s)
-                s = f.read()
-    finally:
-        f.close()
-
-    f = check_valid_file(fn)
-    if f is not None:
-        return f
-    raise TestFailed('invalid resource "%s"' % fn)
-
-
-class WarningsRecorder(object):
-    """Convenience wrapper for the warnings list returned on
-       entry to the warnings.catch_warnings() context manager.
-    """
-    def __init__(self, warnings_list):
-        self._warnings = warnings_list
-        self._last = 0
-
-    def __getattr__(self, attr):
-        if len(self._warnings) > self._last:
-            return getattr(self._warnings[-1], attr)
-        elif attr in warnings.WarningMessage._WARNING_DETAILS:
-            return None
-        raise AttributeError("%r has no attribute %r" % (self, attr))
-
-    @property
-    def warnings(self):
-        return self._warnings[self._last:]
-
-    def reset(self):
-        self._last = len(self._warnings)
-
-
-def _filterwarnings(filters, quiet=False):
-    """Catch the warnings, then check if all the expected
-    warnings have been raised and re-raise unexpected warnings.
-    If 'quiet' is True, only re-raise the unexpected warnings.
-    """
-    # Clear the warning registry of the calling module
-    # in order to re-raise the warnings.
-    frame = sys._getframe(2)
-    registry = frame.f_globals.get('__warningregistry__')
-    if registry:
-        registry.clear()
-    with warnings.catch_warnings(record=True) as w:
-        # Set filter "always" to record all warnings.  Because
-        # test_warnings swap the module, we need to look up in
-        # the sys.modules dictionary.
-        sys.modules['warnings'].simplefilter("always")
-        yield WarningsRecorder(w)
-    # Filter the recorded warnings
-    reraise = [warning.message for warning in w]
-    missing = []
-    for msg, cat in filters:
-        seen = False
-        for exc in reraise[:]:
-            message = str(exc)
-            # Filter out the matching messages
-            if (re.match(msg, message, re.I) and
-                issubclass(exc.__class__, cat)):
-                seen = True
-                reraise.remove(exc)
-        if not seen and not quiet:
-            # This filter caught nothing
-            missing.append((msg, cat.__name__))
-    if reraise:
-        raise AssertionError("unhandled warning %r" % reraise[0])
-    if missing:
-        raise AssertionError("filter (%r, %s) did not catch any warning" %
-                             missing[0])
-
-
-@contextlib.contextmanager
-def check_warnings(*filters, **kwargs):
-    """Context manager to silence warnings.
-
-    Accept 2-tuples as positional arguments:
-        ("message regexp", WarningCategory)
-
-    Optional argument:
-     - if 'quiet' is True, it does not fail if a filter catches nothing
-        (default True without argument,
-         default False if some filters are defined)
-
-    Without argument, it defaults to:
-        check_warnings(("", Warning), quiet=True)
-    """
-    quiet = kwargs.get('quiet')
-    if not filters:
-        filters = (("", Warning),)
-        # Preserve backward compatibility
-        if quiet is None:
-            quiet = True
-    return _filterwarnings(filters, quiet)
-
-
-@contextlib.contextmanager
-def check_py3k_warnings(*filters, **kwargs):
-    """Context manager to silence py3k warnings.
-
-    Accept 2-tuples as positional arguments:
-        ("message regexp", WarningCategory)
-
-    Optional argument:
-     - if 'quiet' is True, it does not fail if a filter catches nothing
-        (default False)
-
-    Without argument, it defaults to:
-        check_py3k_warnings(("", DeprecationWarning), quiet=False)
-    """
-    if sys.py3kwarning:
-        if not filters:
-            filters = (("", DeprecationWarning),)
-    else:
-        # It should not raise any py3k warning
-        filters = ()
-    return _filterwarnings(filters, kwargs.get('quiet'))
-
-
-class CleanImport(object):
-    """Context manager to force import to return a new module reference.
-
-    This is useful for testing module-level behaviours, such as
-    the emission of a DeprecationWarning on import.
-
-    Use like this:
-
-        with CleanImport("foo"):
-            importlib.import_module("foo") # new reference
-    """
-
-    def __init__(self, *module_names):
-        self.original_modules = sys.modules.copy()
-        for module_name in module_names:
-            if module_name in sys.modules:
-                module = sys.modules[module_name]
-                # It is possible that module_name is just an alias for
-                # another module (e.g. stub for modules renamed in 3.x).
-                # In that case, we also need delete the real module to clear
-                # the import cache.
-                if module.__name__ != module_name:
-                    del sys.modules[module.__name__]
-                del sys.modules[module_name]
-
-    def __enter__(self):
-        return self
-
-    def __exit__(self, *ignore_exc):
-        sys.modules.update(self.original_modules)
-
-
-class EnvironmentVarGuard(UserDict.DictMixin):
-
-    """Class to help protect the environment variable properly.  Can be used as
-    a context manager."""
-
-    def __init__(self):
-        self._environ = os.environ
-        self._changed = {}
-
-    def __getitem__(self, envvar):
-        return self._environ[envvar]
-
-    def __setitem__(self, envvar, value):
-        # Remember the initial value on the first access
-        if envvar not in self._changed:
-            self._changed[envvar] = self._environ.get(envvar)
-        self._environ[envvar] = value
-
-    def __delitem__(self, envvar):
-        # Remember the initial value on the first access
-        if envvar not in self._changed:
-            self._changed[envvar] = self._environ.get(envvar)
-        if envvar in self._environ:
-            del self._environ[envvar]
-
-    def keys(self):
-        return self._environ.keys()
-
-    def set(self, envvar, value):
-        self[envvar] = value
-
-    def unset(self, envvar):
-        del self[envvar]
-
-    def __enter__(self):
-        return self
-
-    def __exit__(self, *ignore_exc):
-        for (k, v) in self._changed.items():
-            if v is None:
-                if k in self._environ:
-                    del self._environ[k]
-            else:
-                self._environ[k] = v
-        os.environ = self._environ
-
-
-class DirsOnSysPath(object):
-    """Context manager to temporarily add directories to sys.path.
-
-    This makes a copy of sys.path, appends any directories given
-    as positional arguments, then reverts sys.path to the copied
-    settings when the context ends.
-
-    Note that *all* sys.path modifications in the body of the
-    context manager, including replacement of the object,
-    will be reverted at the end of the block.
-    """
-
-    def __init__(self, *paths):
-        self.original_value = sys.path[:]
-        self.original_object = sys.path
-        sys.path.extend(paths)
-
-    def __enter__(self):
-        return self
-
-    def __exit__(self, *ignore_exc):
-        sys.path = self.original_object
-        sys.path[:] = self.original_value
-
-
-class TransientResource(object):
-
-    """Raise ResourceDenied if an exception is raised while the context manager
-    is in effect that matches the specified exception and attributes."""
-
-    def __init__(self, exc, **kwargs):
-        self.exc = exc
-        self.attrs = kwargs
-
-    def __enter__(self):
-        return self
-
-    def __exit__(self, type_=None, value=None, traceback=None):
-        """If type_ is a subclass of self.exc and value has attributes matching
-        self.attrs, raise ResourceDenied.  Otherwise let the exception
-        propagate (if any)."""
-        if type_ is not None and issubclass(self.exc, type_):
-            for attr, attr_value in self.attrs.iteritems():
-                if not hasattr(value, attr):
-                    break
-                if getattr(value, attr) != attr_value:
-                    break
-            else:
-                raise ResourceDenied("an optional resource is not available")
-
-
-@contextlib.contextmanager
-def transient_internet(resource_name, timeout=30.0, errnos=()):
-    """Return a context manager that raises ResourceDenied when various issues
-    with the Internet connection manifest themselves as exceptions."""
-    default_errnos = [
-        ('ECONNREFUSED', 111),
-        ('ECONNRESET', 104),
-        ('EHOSTUNREACH', 113),
-        ('ENETUNREACH', 101),
-        ('ETIMEDOUT', 110),
-    ]
-    default_gai_errnos = [
-        ('EAI_AGAIN', -3),
-        ('EAI_FAIL', -4),
-        ('EAI_NONAME', -2),
-        ('EAI_NODATA', -5),
-        # Windows defines EAI_NODATA as 11001 but idiotic getaddrinfo()
-        # implementation actually returns WSANO_DATA i.e. 11004.
-        ('WSANO_DATA', 11004),
-    ]
-
-    denied = ResourceDenied("Resource '%s' is not available" % resource_name)
-    captured_errnos = errnos
-    gai_errnos = []
-    if not captured_errnos:
-        captured_errnos = [getattr(errno, name, num)
-                           for (name, num) in default_errnos]
-        gai_errnos = [getattr(socket, name, num)
-                      for (name, num) in default_gai_errnos]
-
-    def filter_error(err):
-        n = getattr(err, 'errno', None)
-        if (isinstance(err, socket.timeout) or
-            (isinstance(err, socket.gaierror) and n in gai_errnos) or
-            n in captured_errnos):
-            if not verbose:
-                sys.stderr.write(denied.args[0] + "\n")
-            raise denied
-
-    old_timeout = socket.getdefaulttimeout()
-    try:
-        if timeout is not None:
-            socket.setdefaulttimeout(timeout)
-        yield
-    except IOError as err:
-        # urllib can wrap original socket errors multiple times (!), we must
-        # unwrap to get at the original error.
-        while True:
-            a = err.args
-            if len(a) >= 1 and isinstance(a[0], IOError):
-                err = a[0]
-            # The error can also be wrapped as args[1]:
-            #    except socket.error as msg:
-            #        raise IOError('socket error', msg).with_traceback(sys.exc_info()[2])
-            elif len(a) >= 2 and isinstance(a[1], IOError):
-                err = a[1]
-            else:
-                break
-        filter_error(err)
-        raise
-    # XXX should we catch generic exceptions and look for their
-    # __cause__ or __context__?
-    finally:
-        socket.setdefaulttimeout(old_timeout)
-
-
-@contextlib.contextmanager
-def captured_output(stream_name):
-    """Return a context manager used by captured_stdout and captured_stdin
-    that temporarily replaces the sys stream *stream_name* with a StringIO."""
-    import StringIO
-    orig_stdout = getattr(sys, stream_name)
-    setattr(sys, stream_name, StringIO.StringIO())
-    try:
-        yield getattr(sys, stream_name)
-    finally:
-        setattr(sys, stream_name, orig_stdout)
-
-def captured_stdout():
-    """Capture the output of sys.stdout:
-
-       with captured_stdout() as s:
-           print "hello"
-       self.assertEqual(s.getvalue(), "hello")
-    """
-    return captured_output("stdout")
-
-def captured_stderr():
-    return captured_output("stderr")
-
-def captured_stdin():
-    return captured_output("stdin")
-
-def gc_collect():
-    """Force as many objects as possible to be collected.
-
-    In non-CPython implementations of Python, this is needed because timely
-    deallocation is not guaranteed by the garbage collector.  (Even in CPython
-    this can be the case in case of reference cycles.)  This means that __del__
-    methods may be called later than expected and weakrefs may remain alive for
-    longer than expected.  This function tries its best to force all garbage
-    objects to disappear.
-    """
-    gc.collect()
-    if is_jython:
-        time.sleep(0.1)
-    gc.collect()
-    gc.collect()
-
-
-_header = '2P'
-if hasattr(sys, "gettotalrefcount"):
-    _header = '2P' + _header
-_vheader = _header + 'P'
-
-def calcobjsize(fmt):
-    return struct.calcsize(_header + fmt + '0P')
-
-def calcvobjsize(fmt):
-    return struct.calcsize(_vheader + fmt + '0P')
-
-
-_TPFLAGS_HAVE_GC = 1<<14
-_TPFLAGS_HEAPTYPE = 1<<9
-
-def check_sizeof(test, o, size):
-    import _testcapi
-    result = sys.getsizeof(o)
-    # add GC header size
-    if ((type(o) == type) and (o.__flags__ & _TPFLAGS_HEAPTYPE) or\
-        ((type(o) != type) and (type(o).__flags__ & _TPFLAGS_HAVE_GC))):
-        size += _testcapi.SIZEOF_PYGC_HEAD
-    msg = 'wrong size for %s: got %d, expected %d' \
-            % (type(o), result, size)
-    test.assertEqual(result, size, msg)
-
-
-#=======================================================================
-# Decorator for running a function in a different locale, correctly resetting
-# it afterwards.
-
-def run_with_locale(catstr, *locales):
-    def decorator(func):
-        def inner(*args, **kwds):
-            try:
-                import locale
-                category = getattr(locale, catstr)
-                orig_locale = locale.setlocale(category)
-            except AttributeError:
-                # if the test author gives us an invalid category string
-                raise
-            except:
-                # cannot retrieve original locale, so do nothing
-                locale = orig_locale = None
-            else:
-                for loc in locales:
-                    try:
-                        locale.setlocale(category, loc)
-                        break
-                    except:
-                        pass
-
-            # now run the function, resetting the locale on exceptions
-            try:
-                return func(*args, **kwds)
-            finally:
-                if locale and orig_locale:
-                    locale.setlocale(category, orig_locale)
-        inner.func_name = func.func_name
-        inner.__doc__ = func.__doc__
-        return inner
-    return decorator
-
-#=======================================================================
-# Decorator for running a function in a specific timezone, correctly
-# resetting it afterwards.
-
-def run_with_tz(tz):
-    def decorator(func):
-        def inner(*args, **kwds):
-            try:
-                tzset = time.tzset
-            except AttributeError:
-                raise unittest.SkipTest("tzset required")
-            if 'TZ' in os.environ:
-                orig_tz = os.environ['TZ']
-            else:
-                orig_tz = None
-            os.environ['TZ'] = tz
-            tzset()
-
-            # now run the function, resetting the tz on exceptions
-            try:
-                return func(*args, **kwds)
-            finally:
-                if orig_tz is None:
-                    del os.environ['TZ']
-                else:
-                    os.environ['TZ'] = orig_tz
-                time.tzset()
-
-        inner.__name__ = func.__name__
-        inner.__doc__ = func.__doc__
-        return inner
-    return decorator
-
-#=======================================================================
-# Big-memory-test support. Separate from 'resources' because memory use should be configurable.
-
-# Some handy shorthands. Note that these are used for byte-limits as well
-# as size-limits, in the various bigmem tests
-_1M = 1024*1024
-_1G = 1024 * _1M
-_2G = 2 * _1G
-_4G = 4 * _1G
-
-MAX_Py_ssize_t = sys.maxsize
-
-def set_memlimit(limit):
-    global max_memuse
-    global real_max_memuse
-    sizes = {
-        'k': 1024,
-        'm': _1M,
-        'g': _1G,
-        't': 1024*_1G,
-    }
-    m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit,
-                 re.IGNORECASE | re.VERBOSE)
-    if m is None:
-        raise ValueError('Invalid memory limit %r' % (limit,))
-    memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()])
-    real_max_memuse = memlimit
-    if memlimit > MAX_Py_ssize_t:
-        memlimit = MAX_Py_ssize_t
-    if memlimit < _2G - 1:
-        raise ValueError('Memory limit %r too low to be useful' % (limit,))
-    max_memuse = memlimit
-
-def bigmemtest(minsize, memuse, overhead=5*_1M):
-    """Decorator for bigmem tests.
-
-    'minsize' is the minimum useful size for the test (in arbitrary,
-    test-interpreted units.) 'memuse' is the number of 'bytes per size' for
-    the test, or a good estimate of it. 'overhead' specifies fixed overhead,
-    independent of the testsize, and defaults to 5Mb.
-
-    The decorator tries to guess a good value for 'size' and passes it to
-    the decorated test function. If minsize * memuse is more than the
-    allowed memory use (as defined by max_memuse), the test is skipped.
-    Otherwise, minsize is adjusted upward to use up to max_memuse.
-    """
-    def decorator(f):
-        def wrapper(self):
-            if not max_memuse:
-                # If max_memuse is 0 (the default),
-                # we still want to run the tests with size set to a few kb,
-                # to make sure they work. We still want to avoid using
-                # too much memory, though, but we do that noisily.
-                maxsize = 5147
-                self.assertFalse(maxsize * memuse + overhead > 20 * _1M)
-            else:
-                maxsize = int((max_memuse - overhead) / memuse)
-                if maxsize < minsize:
-                    # Really ought to print 'test skipped' or something
-                    if verbose:
-                        sys.stderr.write("Skipping %s because of memory "
-                                         "constraint\n" % (f.__name__,))
-                    return
-                # Try to keep some breathing room in memory use
-                maxsize = max(maxsize - 50 * _1M, minsize)
-            return f(self, maxsize)
-        wrapper.minsize = minsize
-        wrapper.memuse = memuse
-        wrapper.overhead = overhead
-        return wrapper
-    return decorator
-
-def precisionbigmemtest(size, memuse, overhead=5*_1M, dry_run=True):
-    def decorator(f):
-        def wrapper(self):
-            if not real_max_memuse:
-                maxsize = 5147
-            else:
-                maxsize = size
-
-            if ((real_max_memuse or not dry_run)
-                and real_max_memuse < maxsize * memuse):
-                if verbose:
-                    sys.stderr.write("Skipping %s because of memory "
-                                     "constraint\n" % (f.__name__,))
-                return
-
-            return f(self, maxsize)
-        wrapper.size = size
-        wrapper.memuse = memuse
-        wrapper.overhead = overhead
-        return wrapper
-    return decorator
-
-def bigaddrspacetest(f):
-    """Decorator for tests that fill the address space."""
-    def wrapper(self):
-        if max_memuse < MAX_Py_ssize_t:
-            if verbose:
-                sys.stderr.write("Skipping %s because of memory "
-                                 "constraint\n" % (f.__name__,))
-        else:
-            return f(self)
-    return wrapper
-
-#=======================================================================
-# unittest integration.
-
-class BasicTestRunner:
-    def run(self, test):
-        result = unittest.TestResult()
-        test(result)
-        return result
-
-def _id(obj):
-    return obj
-
-def requires_resource(resource):
-    if resource == 'gui' and not _is_gui_available():
-        return unittest.skip(_is_gui_available.reason)
-    if is_resource_enabled(resource):
-        return _id
-    else:
-        return unittest.skip("resource {0!r} is not enabled".format(resource))
-
-def cpython_only(test):
-    """
-    Decorator for tests only applicable on CPython.
-    """
-    return impl_detail(cpython=True)(test)
-
-def impl_detail(msg=None, **guards):
-    if check_impl_detail(**guards):
-        return _id
-    if msg is None:
-        guardnames, default = _parse_guards(guards)
-        if default:
-            msg = "implementation detail not available on {0}"
-        else:
-            msg = "implementation detail specific to {0}"
-        guardnames = sorted(guardnames.keys())
-        msg = msg.format(' or '.join(guardnames))
-    return unittest.skip(msg)
-
-def _parse_guards(guards):
-    # Returns a tuple ({platform_name: run_me}, default_value)
-    if not guards:
-        return ({'cpython': True}, False)
-    is_true = guards.values()[0]
-    assert guards.values() == [is_true] * len(guards)   # all True or all False
-    return (guards, not is_true)
-
-# Use the following check to guard CPython's implementation-specific tests --
-# or to run them only on the implementation(s) guarded by the arguments.
-def check_impl_detail(**guards):
-    """This function returns True or False depending on the host platform.
-       Examples:
-          if check_impl_detail():               # only on CPython (default)
-          if check_impl_detail(jython=True):    # only on Jython
-          if check_impl_detail(cpython=False):  # everywhere except on CPython
-    """
-    guards, default = _parse_guards(guards)
-    return guards.get(platform.python_implementation().lower(), default)
-
-
-
-def _run_suite(suite):
-    """Run tests from a unittest.TestSuite-derived class."""
-    if verbose:
-        runner = unittest.TextTestRunner(sys.stdout, verbosity=2)
-    else:
-        runner = BasicTestRunner()
-
-    result = runner.run(suite)
-    if not result.wasSuccessful():
-        if len(result.errors) == 1 and not result.failures:
-            err = result.errors[0][1]
-        elif len(result.failures) == 1 and not result.errors:
-            err = result.failures[0][1]
-        else:
-            err = "multiple errors occurred"
-            if not verbose:
-                err += "; run in verbose mode for details"
-        raise TestFailed(err)
-
-
-def run_unittest(*classes):
-    """Run tests from unittest.TestCase-derived classes."""
-    valid_types = (unittest.TestSuite, unittest.TestCase)
-    suite = unittest.TestSuite()
-    for cls in classes:
-        if isinstance(cls, str):
-            if cls in sys.modules:
-                suite.addTest(unittest.findTestCases(sys.modules[cls]))
-            else:
-                raise ValueError("str arguments must be keys in sys.modules")
-        elif isinstance(cls, valid_types):
-            suite.addTest(cls)
-        else:
-            suite.addTest(unittest.makeSuite(cls))
-    _run_suite(suite)
-
-#=======================================================================
-# Check for the presence of docstrings.
-
-HAVE_DOCSTRINGS = (check_impl_detail(cpython=False) or
-                   sys.platform == 'win32' or
-                   sysconfig.get_config_var('WITH_DOC_STRINGS'))
-
-requires_docstrings = unittest.skipUnless(HAVE_DOCSTRINGS,
-                                          "test requires docstrings")
-
-
-#=======================================================================
-# doctest driver.
-
-def run_doctest(module, verbosity=None):
-    """Run doctest on the given module.  Return (#failures, #tests).
-
-    If optional argument verbosity is not specified (or is None), pass
-    test_support's belief about verbosity on to doctest.  Else doctest's
-    usual behavior is used (it searches sys.argv for -v).
-    """
-
-    import doctest
-
-    if verbosity is None:
-        verbosity = verbose
-    else:
-        verbosity = None
-
-    # Direct doctest output (normally just errors) to real stdout; doctest
-    # output shouldn't be compared by regrtest.
-    save_stdout = sys.stdout
-    sys.stdout = get_original_stdout()
-    try:
-        f, t = doctest.testmod(module, verbose=verbosity)
-        if f:
-            raise TestFailed("%d of %d doctests failed" % (f, t))
-    finally:
-        sys.stdout = save_stdout
-    if verbose:
-        print 'doctest (%s) ... %d tests with zero failures' % (module.__name__, t)
-    return f, t
-
-#=======================================================================
-# Threading support to prevent reporting refleaks when running regrtest.py -R
-
-# NOTE: we use thread._count() rather than threading.enumerate() (or the
-# moral equivalent thereof) because a threading.Thread object is still alive
-# until its __bootstrap() method has returned, even after it has been
-# unregistered from the threading module.
-# thread._count(), on the other hand, only gets decremented *after* the
-# __bootstrap() method has returned, which gives us reliable reference counts
-# at the end of a test run.
-
-def threading_setup():
-    if thread:
-        return thread._count(),
-    else:
-        return 1,
-
-def threading_cleanup(nb_threads):
-    if not thread:
-        return
-
-    _MAX_COUNT = 10
-    for count in range(_MAX_COUNT):
-        n = thread._count()
-        if n == nb_threads:
-            break
-        time.sleep(0.1)
-    # XXX print a warning in case of failure?
-
-def reap_threads(func):
-    """Use this function when threads are being used.  This will
-    ensure that the threads are cleaned up even when the test fails.
-    If threading is unavailable this function does nothing.
-    """
-    if not thread:
-        return func
-
-    @functools.wraps(func)
-    def decorator(*args):
-        key = threading_setup()
-        try:
-            return func(*args)
-        finally:
-            threading_cleanup(*key)
-    return decorator
-
-def reap_children():
-    """Use this function at the end of test_main() whenever sub-processes
-    are started.  This will help ensure that no extra children (zombies)
-    stick around to hog resources and create problems when looking
-    for refleaks.
-    """
-
-    # Reap all our dead child processes so we don't leave zombies around.
-    # These hog resources and might be causing some of the buildbots to die.
-    if hasattr(os, 'waitpid'):
-        any_process = -1
-        while True:
-            try:
-                # This will raise an exception on Windows.  That's ok.
-                pid, status = os.waitpid(any_process, os.WNOHANG)
-                if pid == 0:
-                    break
-            except:
-                break
-
-@contextlib.contextmanager
-def start_threads(threads, unlock=None):
-    threads = list(threads)
-    started = []
-    try:
-        try:
-            for t in threads:
-                t.start()
-                started.append(t)
-        except:
-            if verbose:
-                print("Can't start %d threads, only %d threads started" %
-                      (len(threads), len(started)))
-            raise
-        yield
-    finally:
-        if unlock:
-            unlock()
-        endtime = starttime = time.time()
-        for timeout in range(1, 16):
-            endtime += 60
-            for t in started:
-                t.join(max(endtime - time.time(), 0.01))
-            started = [t for t in started if t.isAlive()]
-            if not started:
-                break
-            if verbose:
-                print('Unable to join %d threads during a period of '
-                      '%d minutes' % (len(started), timeout))
-    started = [t for t in started if t.isAlive()]
-    if started:
-        raise AssertionError('Unable to join %d threads' % len(started))
-
-@contextlib.contextmanager
-def swap_attr(obj, attr, new_val):
-    """Temporary swap out an attribute with a new object.
-
-    Usage:
-        with swap_attr(obj, "attr", 5):
-            ...
-
-        This will set obj.attr to 5 for the duration of the with: block,
-        restoring the old value at the end of the block. If `attr` doesn't
-        exist on `obj`, it will be created and then deleted at the end of the
-        block.
-
-        The old value (or None if it doesn't exist) will be assigned to the
-        target of the "as" clause, if there is one.
-    """
-    if hasattr(obj, attr):
-        real_val = getattr(obj, attr)
-        setattr(obj, attr, new_val)
-        try:
-            yield real_val
-        finally:
-            setattr(obj, attr, real_val)
-    else:
-        setattr(obj, attr, new_val)
-        try:
-            yield
-        finally:
-            if hasattr(obj, attr):
-                delattr(obj, attr)
-
-@contextlib.contextmanager
-def swap_item(obj, item, new_val):
-    """Temporary swap out an item with a new object.
-
-    Usage:
-        with swap_item(obj, "item", 5):
-            ...
-
-        This will set obj["item"] to 5 for the duration of the with: block,
-        restoring the old value at the end of the block. If `item` doesn't
-        exist on `obj`, it will be created and then deleted at the end of the
-        block.
-
-        The old value (or None if it doesn't exist) will be assigned to the
-        target of the "as" clause, if there is one.
-    """
-    if item in obj:
-        real_val = obj[item]
-        obj[item] = new_val
-        try:
-            yield real_val
-        finally:
-            obj[item] = real_val
-    else:
-        obj[item] = new_val
-        try:
-            yield
-        finally:
-            if item in obj:
-                del obj[item]
-
-def py3k_bytes(b):
-    """Emulate the py3k bytes() constructor.
-
-    NOTE: This is only a best effort function.
-    """
-    try:
-        # memoryview?
-        return b.tobytes()
-    except AttributeError:
-        try:
-            # iterable of ints?
-            return b"".join(chr(x) for x in b)
-        except TypeError:
-            return bytes(b)
-
-def args_from_interpreter_flags():
-    """Return a list of command-line arguments reproducing the current
-    settings in sys.flags."""
-    import subprocess
-    return subprocess._args_from_interpreter_flags()
-
-def strip_python_stderr(stderr):
-    """Strip the stderr of a Python process from potential debug output
-    emitted by the interpreter.
-
-    This will typically be run on the result of the communicate() method
-    of a subprocess.Popen object.
-    """
-    stderr = re.sub(br"\[\d+ refs\]\r?\n?$", b"", stderr).strip()
-    return stderr
-
-
-def check_free_after_iterating(test, iter, cls, args=()):
-    class A(cls):
-        def __del__(self):
-            done[0] = True
-            try:
-                next(it)
-            except StopIteration:
-                pass
-
-    done = [False]
-    it = iter(A(*args))
-    # Issue 26494: Shouldn't crash
-    test.assertRaises(StopIteration, next, it)
-    # The sequence should be deallocated just after the end of iterating
-    gc_collect()
-    test.assertTrue(done[0])
-
-@contextlib.contextmanager
-def disable_gc():
-    have_gc = gc.isenabled()
-    gc.disable()
-    try:
-        yield
-    finally:
-        if have_gc:
-            gc.enable()
+import test.support
+sys.modules['test.test_support'] = test.support
index 8ec9f2a0eb167b2b95a7cb1f7d3d8b629a1fcc6f..4c92858d15693a71218fad5135ce0438c632f56c 100644 (file)
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -144,6 +144,12 @@ Build
 Tests
 -----
 
+- bpo-30207: To simplify backports from Python 3, the test.test_support
+  module was converted into a package and renamed to test.support.  The
+  test.script_helper module was moved into the test.support package.
+  Names test.test_support and test.script_helper are left as aliases to
+  test.support and test.support.script_helper.
+
 - bpo-30197: Enhanced function swap_attr() in the test.test_support module.
   It now works when delete replaced attribute inside the with statement.  The
   old value of the attribute (or None if it doesn't exist) now will be