The :mod:`test` package contains all regression tests for Python as well as the
-modules :mod:`test.test_support` and :mod:`test.regrtest`.
-:mod:`test.test_support` is used to enhance your tests while
+modules :mod:`test.support` and :mod:`test.regrtest`.
+:mod:`test.support` is used to enhance your tests while
:mod:`test.regrtest` drives the testing suite.
Each module in the :mod:`test` package whose name starts with ``test_`` is a
A basic boilerplate is often used::
import unittest
- from test import test_support
+ from test import support
class MyTestCase1(unittest.TestCase):
... more test classes ...
def test_main():
- test_support.run_unittest(MyTestCase1,
- MyTestCase2,
- ... list other tests ...
- )
+ support.run_unittest(MyTestCase1,
+ MyTestCase2,
+ ... list other tests ...
+ )
if __name__ == '__main__':
test_main()
tests.
-:mod:`test.test_support` --- Utility functions for tests
-========================================================
+:mod:`test.support` --- Utility functions for tests
+===================================================
-.. module:: test.test_support
+.. module:: test.support
:synopsis: Support for Python regression tests.
.. note::
The :mod:`test.test_support` module has been renamed to :mod:`test.support`
- in Python 3.x.
+ in Python 3.x and 2.7.13. The name ``test.test_support`` has been retained
+ as an alias in 2.7.
-The :mod:`test.test_support` module provides support for Python's regression
+The :mod:`test.support` module provides support for Python's regression
tests.
This module defines the following exceptions:
network connection) is not available. Raised by the :func:`requires`
function.
-The :mod:`test.test_support` module defines the following constants:
+The :mod:`test.support` module defines the following constants:
.. data:: verbose
Set to a name that is safe to use as the name of a temporary file. Any
temporary file that is created should be closed and unlinked (removed).
-The :mod:`test.test_support` module defines the following functions:
+The :mod:`test.support` module defines the following functions:
.. function:: forget(module_name)
following :func:`test_main` function::
def test_main():
- test_support.run_unittest(__name__)
+ support.run_unittest(__name__)
This will run all tests defined in the named module.
.. versionadded:: 2.7
-The :mod:`test.test_support` module defines the following classes:
+The :mod:`test.support` module defines the following classes:
.. class:: TransientResource(exc[, **kwargs])
-# Common utility functions used by various script execution tests
-# e.g. test_cmd_line, test_cmd_line_script and test_runpy
-
-import sys
-import os
-import re
-import os.path
-import tempfile
-import subprocess
-import py_compile
-import contextlib
-import shutil
-try:
- import zipfile
-except ImportError:
- # If Python is build without Unicode support, importing _io will
- # fail, which, in turn, means that zipfile cannot be imported
- # Most of this module can then still be used.
- pass
-
-from test.test_support import strip_python_stderr
-
-# Executing the interpreter in a subprocess
-def _assert_python(expected_success, *args, **env_vars):
- cmd_line = [sys.executable]
- if not env_vars:
- cmd_line.append('-E')
- cmd_line.extend(args)
- # Need to preserve the original environment, for in-place testing of
- # shared library builds.
- env = os.environ.copy()
- env.update(env_vars)
- p = subprocess.Popen(cmd_line, stdin=subprocess.PIPE,
- stdout=subprocess.PIPE, stderr=subprocess.PIPE,
- env=env)
- try:
- out, err = p.communicate()
- finally:
- subprocess._cleanup()
- p.stdout.close()
- p.stderr.close()
- rc = p.returncode
- err = strip_python_stderr(err)
- if (rc and expected_success) or (not rc and not expected_success):
- raise AssertionError(
- "Process return code is %d, "
- "stderr follows:\n%s" % (rc, err.decode('ascii', 'ignore')))
- return rc, out, err
-
-def assert_python_ok(*args, **env_vars):
- """
- Assert that running the interpreter with `args` and optional environment
- variables `env_vars` is ok and return a (return code, stdout, stderr) tuple.
- """
- return _assert_python(True, *args, **env_vars)
-
-def assert_python_failure(*args, **env_vars):
- """
- Assert that running the interpreter with `args` and optional environment
- variables `env_vars` fails and return a (return code, stdout, stderr) tuple.
- """
- return _assert_python(False, *args, **env_vars)
-
-def python_exit_code(*args):
- cmd_line = [sys.executable, '-E']
- cmd_line.extend(args)
- with open(os.devnull, 'w') as devnull:
- return subprocess.call(cmd_line, stdout=devnull,
- stderr=subprocess.STDOUT)
-
-def spawn_python(*args, **kwargs):
- cmd_line = [sys.executable, '-E']
- cmd_line.extend(args)
- return subprocess.Popen(cmd_line, stdin=subprocess.PIPE,
- stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
- **kwargs)
-
-def kill_python(p):
- p.stdin.close()
- data = p.stdout.read()
- p.stdout.close()
- # try to cleanup the child so we don't appear to leak when running
- # with regrtest -R.
- p.wait()
- subprocess._cleanup()
- return data
-
-def run_python(*args, **kwargs):
- if __debug__:
- p = spawn_python(*args, **kwargs)
- else:
- p = spawn_python('-O', *args, **kwargs)
- stdout_data = kill_python(p)
- return p.wait(), stdout_data
-
-# Script creation utilities
-@contextlib.contextmanager
-def temp_dir():
- dirname = tempfile.mkdtemp()
- dirname = os.path.realpath(dirname)
- try:
- yield dirname
- finally:
- shutil.rmtree(dirname)
-
-def make_script(script_dir, script_basename, source):
- script_filename = script_basename+os.extsep+'py'
- script_name = os.path.join(script_dir, script_filename)
- script_file = open(script_name, 'w')
- script_file.write(source)
- script_file.close()
- return script_name
-
-def compile_script(script_name):
- py_compile.compile(script_name, doraise=True)
- if __debug__:
- compiled_name = script_name + 'c'
- else:
- compiled_name = script_name + 'o'
- return compiled_name
-
-def make_zip_script(zip_dir, zip_basename, script_name, name_in_zip=None):
- zip_filename = zip_basename+os.extsep+'zip'
- zip_name = os.path.join(zip_dir, zip_filename)
- zip_file = zipfile.ZipFile(zip_name, 'w')
- if name_in_zip is None:
- name_in_zip = os.path.basename(script_name)
- zip_file.write(script_name, name_in_zip)
- zip_file.close()
- #if test.test_support.verbose:
- # zip_file = zipfile.ZipFile(zip_name, 'r')
- # print 'Contents of %r:' % zip_name
- # zip_file.printdir()
- # zip_file.close()
- return zip_name, os.path.join(zip_name, name_in_zip)
-
-def make_pkg(pkg_dir, init_source=''):
- os.mkdir(pkg_dir)
- make_script(pkg_dir, '__init__', init_source)
-
-def make_zip_pkg(zip_dir, zip_basename, pkg_name, script_basename,
- source, depth=1, compiled=False):
- unlink = []
- init_name = make_script(zip_dir, '__init__', '')
- unlink.append(init_name)
- init_basename = os.path.basename(init_name)
- script_name = make_script(zip_dir, script_basename, source)
- unlink.append(script_name)
- if compiled:
- init_name = compile_script(init_name)
- script_name = compile_script(script_name)
- unlink.extend((init_name, script_name))
- pkg_names = [os.sep.join([pkg_name]*i) for i in range(1, depth+1)]
- script_name_in_zip = os.path.join(pkg_names[-1], os.path.basename(script_name))
- zip_filename = zip_basename+os.extsep+'zip'
- zip_name = os.path.join(zip_dir, zip_filename)
- zip_file = zipfile.ZipFile(zip_name, 'w')
- for name in pkg_names:
- init_name_in_zip = os.path.join(name, init_basename)
- zip_file.write(init_name, init_name_in_zip)
- zip_file.write(script_name, script_name_in_zip)
- zip_file.close()
- for name in unlink:
- os.unlink(name)
- #if test.test_support.verbose:
- # zip_file = zipfile.ZipFile(zip_name, 'r')
- # print 'Contents of %r:' % zip_name
- # zip_file.printdir()
- # zip_file.close()
- return zip_name, os.path.join(zip_name, script_name_in_zip)
+from test.support.script_helper import *
--- /dev/null
+"""Supporting definitions for the Python regression tests."""
+
+if __name__ != 'test.support':
+ raise ImportError('test.support must be imported from the test package')
+
+import contextlib
+import errno
+import functools
+import gc
+import socket
+import stat
+import sys
+import os
+import platform
+import shutil
+import warnings
+import unittest
+import importlib
+import UserDict
+import re
+import time
+import struct
+import sysconfig
+try:
+ import thread
+except ImportError:
+ thread = None
+
+__all__ = ["Error", "TestFailed", "ResourceDenied", "import_module",
+ "verbose", "use_resources", "max_memuse", "record_original_stdout",
+ "get_original_stdout", "unload", "unlink", "rmtree", "forget",
+ "is_resource_enabled", "requires", "requires_mac_ver",
+ "find_unused_port", "bind_port",
+ "fcmp", "have_unicode", "is_jython", "TESTFN", "HOST", "FUZZ",
+ "SAVEDCWD", "temp_cwd", "findfile", "sortdict", "check_syntax_error",
+ "open_urlresource", "check_warnings", "check_py3k_warnings",
+ "CleanImport", "EnvironmentVarGuard", "captured_output",
+ "captured_stdout", "TransientResource", "transient_internet",
+ "run_with_locale", "set_memlimit", "bigmemtest", "bigaddrspacetest",
+ "BasicTestRunner", "run_unittest", "run_doctest", "threading_setup",
+ "threading_cleanup", "reap_threads", "start_threads", "cpython_only",
+ "check_impl_detail", "get_attribute", "py3k_bytes",
+ "import_fresh_module", "threading_cleanup", "reap_children",
+ "strip_python_stderr", "IPV6_ENABLED", "run_with_tz"]
+
+class Error(Exception):
+ """Base class for regression test exceptions."""
+
+class TestFailed(Error):
+ """Test failed."""
+
+class ResourceDenied(unittest.SkipTest):
+ """Test skipped because it requested a disallowed resource.
+
+ This is raised when a test calls requires() for a resource that
+ has not been enabled. It is used to distinguish between expected
+ and unexpected skips.
+ """
+
+@contextlib.contextmanager
+def _ignore_deprecated_imports(ignore=True):
+ """Context manager to suppress package and module deprecation
+ warnings when importing them.
+
+ If ignore is False, this context manager has no effect."""
+ if ignore:
+ with warnings.catch_warnings():
+ warnings.filterwarnings("ignore", ".+ (module|package)",
+ DeprecationWarning)
+ yield
+ else:
+ yield
+
+
+def import_module(name, deprecated=False):
+ """Import and return the module to be tested, raising SkipTest if
+ it is not available.
+
+ If deprecated is True, any module or package deprecation messages
+ will be suppressed."""
+ with _ignore_deprecated_imports(deprecated):
+ try:
+ return importlib.import_module(name)
+ except ImportError, msg:
+ raise unittest.SkipTest(str(msg))
+
+
+def _save_and_remove_module(name, orig_modules):
+ """Helper function to save and remove a module from sys.modules
+
+ Raise ImportError if the module can't be imported."""
+ # try to import the module and raise an error if it can't be imported
+ if name not in sys.modules:
+ __import__(name)
+ del sys.modules[name]
+ for modname in list(sys.modules):
+ if modname == name or modname.startswith(name + '.'):
+ orig_modules[modname] = sys.modules[modname]
+ del sys.modules[modname]
+
+def _save_and_block_module(name, orig_modules):
+ """Helper function to save and block a module in sys.modules
+
+ Return True if the module was in sys.modules, False otherwise."""
+ saved = True
+ try:
+ orig_modules[name] = sys.modules[name]
+ except KeyError:
+ saved = False
+ sys.modules[name] = None
+ return saved
+
+
+def import_fresh_module(name, fresh=(), blocked=(), deprecated=False):
+ """Imports and returns a module, deliberately bypassing the sys.modules cache
+ and importing a fresh copy of the module. Once the import is complete,
+ the sys.modules cache is restored to its original state.
+
+ Modules named in fresh are also imported anew if needed by the import.
+ If one of these modules can't be imported, None is returned.
+
+ Importing of modules named in blocked is prevented while the fresh import
+ takes place.
+
+ If deprecated is True, any module or package deprecation messages
+ will be suppressed."""
+ # NOTE: test_heapq, test_json, and test_warnings include extra sanity
+ # checks to make sure that this utility function is working as expected
+ with _ignore_deprecated_imports(deprecated):
+ # Keep track of modules saved for later restoration as well
+ # as those which just need a blocking entry removed
+ orig_modules = {}
+ names_to_remove = []
+ _save_and_remove_module(name, orig_modules)
+ try:
+ for fresh_name in fresh:
+ _save_and_remove_module(fresh_name, orig_modules)
+ for blocked_name in blocked:
+ if not _save_and_block_module(blocked_name, orig_modules):
+ names_to_remove.append(blocked_name)
+ fresh_module = importlib.import_module(name)
+ except ImportError:
+ fresh_module = None
+ finally:
+ for orig_name, module in orig_modules.items():
+ sys.modules[orig_name] = module
+ for name_to_remove in names_to_remove:
+ del sys.modules[name_to_remove]
+ return fresh_module
+
+
+def get_attribute(obj, name):
+ """Get an attribute, raising SkipTest if AttributeError is raised."""
+ try:
+ attribute = getattr(obj, name)
+ except AttributeError:
+ raise unittest.SkipTest("module %s has no attribute %s" % (
+ obj.__name__, name))
+ else:
+ return attribute
+
+
+verbose = 1 # Flag set to 0 by regrtest.py
+use_resources = None # Flag set to [] by regrtest.py
+max_memuse = 0 # Disable bigmem tests (they will still be run with
+ # small sizes, to make sure they work.)
+real_max_memuse = 0
+
+# _original_stdout is meant to hold stdout at the time regrtest began.
+# This may be "the real" stdout, or IDLE's emulation of stdout, or whatever.
+# The point is to have some flavor of stdout the user can actually see.
+_original_stdout = None
+def record_original_stdout(stdout):
+ global _original_stdout
+ _original_stdout = stdout
+
+def get_original_stdout():
+ return _original_stdout or sys.stdout
+
+def unload(name):
+ try:
+ del sys.modules[name]
+ except KeyError:
+ pass
+
+def _force_run(path, func, *args):
+ try:
+ return func(*args)
+ except EnvironmentError as err:
+ if verbose >= 2:
+ print('%s: %s' % (err.__class__.__name__, err))
+ print('re-run %s%r' % (func.__name__, args))
+ os.chmod(path, stat.S_IRWXU)
+ return func(*args)
+
+if sys.platform.startswith("win"):
+ def _waitfor(func, pathname, waitall=False):
+ # Perform the operation
+ func(pathname)
+ # Now setup the wait loop
+ if waitall:
+ dirname = pathname
+ else:
+ dirname, name = os.path.split(pathname)
+ dirname = dirname or '.'
+ # Check for `pathname` to be removed from the filesystem.
+ # The exponential backoff of the timeout amounts to a total
+ # of ~1 second after which the deletion is probably an error
+ # anyway.
+ # Testing on an i7@4.3GHz shows that usually only 1 iteration is
+ # required when contention occurs.
+ timeout = 0.001
+ while timeout < 1.0:
+ # Note we are only testing for the existence of the file(s) in
+ # the contents of the directory regardless of any security or
+ # access rights. If we have made it this far, we have sufficient
+ # permissions to do that much using Python's equivalent of the
+ # Windows API FindFirstFile.
+ # Other Windows APIs can fail or give incorrect results when
+ # dealing with files that are pending deletion.
+ L = os.listdir(dirname)
+ if not (L if waitall else name in L):
+ return
+ # Increase the timeout and try again
+ time.sleep(timeout)
+ timeout *= 2
+ warnings.warn('tests may fail, delete still pending for ' + pathname,
+ RuntimeWarning, stacklevel=4)
+
+ def _unlink(filename):
+ _waitfor(os.unlink, filename)
+
+ def _rmdir(dirname):
+ _waitfor(os.rmdir, dirname)
+
+ def _rmtree(path):
+ def _rmtree_inner(path):
+ for name in _force_run(path, os.listdir, path):
+ fullname = os.path.join(path, name)
+ if os.path.isdir(fullname):
+ _waitfor(_rmtree_inner, fullname, waitall=True)
+ _force_run(fullname, os.rmdir, fullname)
+ else:
+ _force_run(fullname, os.unlink, fullname)
+ _waitfor(_rmtree_inner, path, waitall=True)
+ _waitfor(lambda p: _force_run(p, os.rmdir, p), path)
+else:
+ _unlink = os.unlink
+ _rmdir = os.rmdir
+
+ def _rmtree(path):
+ try:
+ shutil.rmtree(path)
+ return
+ except EnvironmentError:
+ pass
+
+ def _rmtree_inner(path):
+ for name in _force_run(path, os.listdir, path):
+ fullname = os.path.join(path, name)
+ try:
+ mode = os.lstat(fullname).st_mode
+ except EnvironmentError:
+ mode = 0
+ if stat.S_ISDIR(mode):
+ _rmtree_inner(fullname)
+ _force_run(path, os.rmdir, fullname)
+ else:
+ _force_run(path, os.unlink, fullname)
+ _rmtree_inner(path)
+ os.rmdir(path)
+
+def unlink(filename):
+ try:
+ _unlink(filename)
+ except OSError:
+ pass
+
+def rmdir(dirname):
+ try:
+ _rmdir(dirname)
+ except OSError as error:
+ # The directory need not exist.
+ if error.errno != errno.ENOENT:
+ raise
+
+def rmtree(path):
+ try:
+ _rmtree(path)
+ except OSError, e:
+ # Unix returns ENOENT, Windows returns ESRCH.
+ if e.errno not in (errno.ENOENT, errno.ESRCH):
+ raise
+
+def forget(modname):
+ '''"Forget" a module was ever imported by removing it from sys.modules and
+ deleting any .pyc and .pyo files.'''
+ unload(modname)
+ for dirname in sys.path:
+ unlink(os.path.join(dirname, modname + os.extsep + 'pyc'))
+ # Deleting the .pyo file cannot be within the 'try' for the .pyc since
+ # the chance exists that there is no .pyc (and thus the 'try' statement
+ # is exited) but there is a .pyo file.
+ unlink(os.path.join(dirname, modname + os.extsep + 'pyo'))
+
+# Check whether a gui is actually available
+def _is_gui_available():
+ if hasattr(_is_gui_available, 'result'):
+ return _is_gui_available.result
+ reason = None
+ if sys.platform.startswith('win'):
+ # if Python is running as a service (such as the buildbot service),
+ # gui interaction may be disallowed
+ import ctypes
+ import ctypes.wintypes
+ UOI_FLAGS = 1
+ WSF_VISIBLE = 0x0001
+ class USEROBJECTFLAGS(ctypes.Structure):
+ _fields_ = [("fInherit", ctypes.wintypes.BOOL),
+ ("fReserved", ctypes.wintypes.BOOL),
+ ("dwFlags", ctypes.wintypes.DWORD)]
+ dll = ctypes.windll.user32
+ h = dll.GetProcessWindowStation()
+ if not h:
+ raise ctypes.WinError()
+ uof = USEROBJECTFLAGS()
+ needed = ctypes.wintypes.DWORD()
+ res = dll.GetUserObjectInformationW(h,
+ UOI_FLAGS,
+ ctypes.byref(uof),
+ ctypes.sizeof(uof),
+ ctypes.byref(needed))
+ if not res:
+ raise ctypes.WinError()
+ if not bool(uof.dwFlags & WSF_VISIBLE):
+ reason = "gui not available (WSF_VISIBLE flag not set)"
+ elif sys.platform == 'darwin':
+ # The Aqua Tk implementations on OS X can abort the process if
+ # being called in an environment where a window server connection
+ # cannot be made, for instance when invoked by a buildbot or ssh
+ # process not running under the same user id as the current console
+ # user. To avoid that, raise an exception if the window manager
+ # connection is not available.
+ from ctypes import cdll, c_int, pointer, Structure
+ from ctypes.util import find_library
+
+ app_services = cdll.LoadLibrary(find_library("ApplicationServices"))
+
+ if app_services.CGMainDisplayID() == 0:
+ reason = "gui tests cannot run without OS X window manager"
+ else:
+ class ProcessSerialNumber(Structure):
+ _fields_ = [("highLongOfPSN", c_int),
+ ("lowLongOfPSN", c_int)]
+ psn = ProcessSerialNumber()
+ psn_p = pointer(psn)
+ if ( (app_services.GetCurrentProcess(psn_p) < 0) or
+ (app_services.SetFrontProcess(psn_p) < 0) ):
+ reason = "cannot run without OS X gui process"
+
+ # check on every platform whether tkinter can actually do anything
+ if not reason:
+ try:
+ from Tkinter import Tk
+ root = Tk()
+ root.withdraw()
+ root.update()
+ root.destroy()
+ except Exception as e:
+ err_string = str(e)
+ if len(err_string) > 50:
+ err_string = err_string[:50] + ' [...]'
+ reason = 'Tk unavailable due to {}: {}'.format(type(e).__name__,
+ err_string)
+
+ _is_gui_available.reason = reason
+ _is_gui_available.result = not reason
+
+ return _is_gui_available.result
+
+def is_resource_enabled(resource):
+ """Test whether a resource is enabled.
+
+ Known resources are set by regrtest.py. If not running under regrtest.py,
+ all resources are assumed enabled unless use_resources has been set.
+ """
+ return use_resources is None or resource in use_resources
+
+def requires(resource, msg=None):
+ """Raise ResourceDenied if the specified resource is not available."""
+ if not is_resource_enabled(resource):
+ if msg is None:
+ msg = "Use of the `%s' resource not enabled" % resource
+ raise ResourceDenied(msg)
+ if resource == 'gui' and not _is_gui_available():
+ raise ResourceDenied(_is_gui_available.reason)
+
+def requires_mac_ver(*min_version):
+ """Decorator raising SkipTest if the OS is Mac OS X and the OS X
+ version if less than min_version.
+
+ For example, @requires_mac_ver(10, 5) raises SkipTest if the OS X version
+ is lesser than 10.5.
+ """
+ def decorator(func):
+ @functools.wraps(func)
+ def wrapper(*args, **kw):
+ if sys.platform == 'darwin':
+ version_txt = platform.mac_ver()[0]
+ try:
+ version = tuple(map(int, version_txt.split('.')))
+ except ValueError:
+ pass
+ else:
+ if version < min_version:
+ min_version_txt = '.'.join(map(str, min_version))
+ raise unittest.SkipTest(
+ "Mac OS X %s or higher required, not %s"
+ % (min_version_txt, version_txt))
+ return func(*args, **kw)
+ wrapper.min_version = min_version
+ return wrapper
+ return decorator
+
+
+# Don't use "localhost", since resolving it uses the DNS under recent
+# Windows versions (see issue #18792).
+HOST = "127.0.0.1"
+HOSTv6 = "::1"
+
+
+def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM):
+ """Returns an unused port that should be suitable for binding. This is
+ achieved by creating a temporary socket with the same family and type as
+ the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to
+ the specified host address (defaults to 0.0.0.0) with the port set to 0,
+ eliciting an unused ephemeral port from the OS. The temporary socket is
+ then closed and deleted, and the ephemeral port is returned.
+
+ Either this method or bind_port() should be used for any tests where a
+ server socket needs to be bound to a particular port for the duration of
+ the test. Which one to use depends on whether the calling code is creating
+ a python socket, or if an unused port needs to be provided in a constructor
+ or passed to an external program (i.e. the -accept argument to openssl's
+ s_server mode). Always prefer bind_port() over find_unused_port() where
+ possible. Hard coded ports should *NEVER* be used. As soon as a server
+ socket is bound to a hard coded port, the ability to run multiple instances
+ of the test simultaneously on the same host is compromised, which makes the
+ test a ticking time bomb in a buildbot environment. On Unix buildbots, this
+ may simply manifest as a failed test, which can be recovered from without
+ intervention in most cases, but on Windows, the entire python process can
+ completely and utterly wedge, requiring someone to log in to the buildbot
+ and manually kill the affected process.
+
+ (This is easy to reproduce on Windows, unfortunately, and can be traced to
+ the SO_REUSEADDR socket option having different semantics on Windows versus
+ Unix/Linux. On Unix, you can't have two AF_INET SOCK_STREAM sockets bind,
+ listen and then accept connections on identical host/ports. An EADDRINUSE
+ socket.error will be raised at some point (depending on the platform and
+ the order bind and listen were called on each socket).
+
+ However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE
+ will ever be raised when attempting to bind two identical host/ports. When
+ accept() is called on each socket, the second caller's process will steal
+ the port from the first caller, leaving them both in an awkwardly wedged
+ state where they'll no longer respond to any signals or graceful kills, and
+ must be forcibly killed via OpenProcess()/TerminateProcess().
+
+ The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option
+ instead of SO_REUSEADDR, which effectively affords the same semantics as
+ SO_REUSEADDR on Unix. Given the propensity of Unix developers in the Open
+ Source world compared to Windows ones, this is a common mistake. A quick
+ look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when
+ openssl.exe is called with the 's_server' option, for example. See
+ http://bugs.python.org/issue2550 for more info. The following site also
+ has a very thorough description about the implications of both REUSEADDR
+ and EXCLUSIVEADDRUSE on Windows:
+ http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx)
+
+ XXX: although this approach is a vast improvement on previous attempts to
+ elicit unused ports, it rests heavily on the assumption that the ephemeral
+ port returned to us by the OS won't immediately be dished back out to some
+ other process when we close and delete our temporary socket but before our
+ calling code has a chance to bind the returned port. We can deal with this
+ issue if/when we come across it."""
+ tempsock = socket.socket(family, socktype)
+ port = bind_port(tempsock)
+ tempsock.close()
+ del tempsock
+ return port
+
+def bind_port(sock, host=HOST):
+ """Bind the socket to a free port and return the port number. Relies on
+ ephemeral ports in order to ensure we are using an unbound port. This is
+ important as many tests may be running simultaneously, especially in a
+ buildbot environment. This method raises an exception if the sock.family
+ is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
+ or SO_REUSEPORT set on it. Tests should *never* set these socket options
+ for TCP/IP sockets. The only case for setting these options is testing
+ multicasting via multiple UDP sockets.
+
+ Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
+ on Windows), it will be set on the socket. This will prevent anyone else
+ from bind()'ing to our host/port for the duration of the test.
+ """
+ if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
+ if hasattr(socket, 'SO_REUSEADDR'):
+ if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
+ raise TestFailed("tests should never set the SO_REUSEADDR " \
+ "socket option on TCP/IP sockets!")
+ if hasattr(socket, 'SO_REUSEPORT'):
+ try:
+ if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
+ raise TestFailed("tests should never set the SO_REUSEPORT " \
+ "socket option on TCP/IP sockets!")
+ except EnvironmentError:
+ # Python's socket module was compiled using modern headers
+ # thus defining SO_REUSEPORT but this process is running
+ # under an older kernel that does not support SO_REUSEPORT.
+ pass
+ if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
+
+ sock.bind((host, 0))
+ port = sock.getsockname()[1]
+ return port
+
+def _is_ipv6_enabled():
+ """Check whether IPv6 is enabled on this host."""
+ if socket.has_ipv6:
+ sock = None
+ try:
+ sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
+ sock.bind((HOSTv6, 0))
+ return True
+ except socket.error:
+ pass
+ finally:
+ if sock:
+ sock.close()
+ return False
+
+IPV6_ENABLED = _is_ipv6_enabled()
+
+def system_must_validate_cert(f):
+ """Skip the test on TLS certificate validation failures."""
+ @functools.wraps(f)
+ def dec(*args, **kwargs):
+ try:
+ f(*args, **kwargs)
+ except IOError as e:
+ if "CERTIFICATE_VERIFY_FAILED" in str(e):
+ raise unittest.SkipTest("system does not contain "
+ "necessary certificates")
+ raise
+ return dec
+
+FUZZ = 1e-6
+
+def fcmp(x, y): # fuzzy comparison function
+ if isinstance(x, float) or isinstance(y, float):
+ try:
+ fuzz = (abs(x) + abs(y)) * FUZZ
+ if abs(x-y) <= fuzz:
+ return 0
+ except:
+ pass
+ elif type(x) == type(y) and isinstance(x, (tuple, list)):
+ for i in range(min(len(x), len(y))):
+ outcome = fcmp(x[i], y[i])
+ if outcome != 0:
+ return outcome
+ return (len(x) > len(y)) - (len(x) < len(y))
+ return (x > y) - (x < y)
+
+
+# A constant likely larger than the underlying OS pipe buffer size, to
+# make writes blocking.
+# Windows limit seems to be around 512 B, and many Unix kernels have a
+# 64 KiB pipe buffer size or 16 * PAGE_SIZE: take a few megs to be sure.
+# (see issue #17835 for a discussion of this number).
+PIPE_MAX_SIZE = 4 * 1024 * 1024 + 1
+
+# A constant likely larger than the underlying OS socket buffer size, to make
+# writes blocking.
+# The socket buffer sizes can usually be tuned system-wide (e.g. through sysctl
+# on Linux), or on a per-socket basis (SO_SNDBUF/SO_RCVBUF). See issue #18643
+# for a discussion of this number).
+SOCK_MAX_SIZE = 16 * 1024 * 1024 + 1
+
+is_jython = sys.platform.startswith('java')
+
+try:
+ unicode
+ have_unicode = True
+except NameError:
+ have_unicode = False
+
+requires_unicode = unittest.skipUnless(have_unicode, 'no unicode support')
+
+def u(s):
+ return unicode(s, 'unicode-escape')
+
+# FS_NONASCII: non-ASCII Unicode character encodable by
+# sys.getfilesystemencoding(), or None if there is no such character.
+FS_NONASCII = None
+if have_unicode:
+ for character in (
+ # First try printable and common characters to have a readable filename.
+ # For each character, the encoding list are just example of encodings able
+ # to encode the character (the list is not exhaustive).
+
+ # U+00E6 (Latin Small Letter Ae): cp1252, iso-8859-1
+ unichr(0x00E6),
+ # U+0130 (Latin Capital Letter I With Dot Above): cp1254, iso8859_3
+ unichr(0x0130),
+ # U+0141 (Latin Capital Letter L With Stroke): cp1250, cp1257
+ unichr(0x0141),
+ # U+03C6 (Greek Small Letter Phi): cp1253
+ unichr(0x03C6),
+ # U+041A (Cyrillic Capital Letter Ka): cp1251
+ unichr(0x041A),
+ # U+05D0 (Hebrew Letter Alef): Encodable to cp424
+ unichr(0x05D0),
+ # U+060C (Arabic Comma): cp864, cp1006, iso8859_6, mac_arabic
+ unichr(0x060C),
+ # U+062A (Arabic Letter Teh): cp720
+ unichr(0x062A),
+ # U+0E01 (Thai Character Ko Kai): cp874
+ unichr(0x0E01),
+
+ # Then try more "special" characters. "special" because they may be
+ # interpreted or displayed differently depending on the exact locale
+ # encoding and the font.
+
+ # U+00A0 (No-Break Space)
+ unichr(0x00A0),
+ # U+20AC (Euro Sign)
+ unichr(0x20AC),
+ ):
+ try:
+ character.encode(sys.getfilesystemencoding())\
+ .decode(sys.getfilesystemencoding())
+ except UnicodeError:
+ pass
+ else:
+ FS_NONASCII = character
+ break
+
+# Filename used for testing
+if os.name == 'java':
+ # Jython disallows @ in module names
+ TESTFN = '$test'
+elif os.name == 'riscos':
+ TESTFN = 'testfile'
+else:
+ TESTFN = '@test'
+ # Unicode name only used if TEST_FN_ENCODING exists for the platform.
+ if have_unicode:
+ # Assuming sys.getfilesystemencoding()!=sys.getdefaultencoding()
+ # TESTFN_UNICODE is a filename that can be encoded using the
+ # file system encoding, but *not* with the default (ascii) encoding
+ if isinstance('', unicode):
+ # python -U
+ # XXX perhaps unicode() should accept Unicode strings?
+ TESTFN_UNICODE = "@test-\xe0\xf2"
+ else:
+ # 2 latin characters.
+ TESTFN_UNICODE = unicode("@test-\xe0\xf2", "latin-1")
+ TESTFN_ENCODING = sys.getfilesystemencoding()
+ # TESTFN_UNENCODABLE is a filename that should *not* be
+ # able to be encoded by *either* the default or filesystem encoding.
+ # This test really only makes sense on Windows NT platforms
+ # which have special Unicode support in posixmodule.
+ if (not hasattr(sys, "getwindowsversion") or
+ sys.getwindowsversion()[3] < 2): # 0=win32s or 1=9x/ME
+ TESTFN_UNENCODABLE = None
+ else:
+ # Japanese characters (I think - from bug 846133)
+ TESTFN_UNENCODABLE = eval('u"@test-\u5171\u6709\u3055\u308c\u308b"')
+ try:
+ # XXX - Note - should be using TESTFN_ENCODING here - but for
+ # Windows, "mbcs" currently always operates as if in
+ # errors=ignore' mode - hence we get '?' characters rather than
+ # the exception. 'Latin1' operates as we expect - ie, fails.
+ # See [ 850997 ] mbcs encoding ignores errors
+ TESTFN_UNENCODABLE.encode("Latin1")
+ except UnicodeEncodeError:
+ pass
+ else:
+ print \
+ 'WARNING: The filename %r CAN be encoded by the filesystem. ' \
+ 'Unicode filename tests may not be effective' \
+ % TESTFN_UNENCODABLE
+
+
+# Disambiguate TESTFN for parallel testing, while letting it remain a valid
+# module name.
+TESTFN = "{}_{}_tmp".format(TESTFN, os.getpid())
+
+# Save the initial cwd
+SAVEDCWD = os.getcwd()
+
+@contextlib.contextmanager
+def change_cwd(path, quiet=False):
+ """Return a context manager that changes the current working directory.
+
+ Arguments:
+
+ path: the directory to use as the temporary current working directory.
+
+ quiet: if False (the default), the context manager raises an exception
+ on error. Otherwise, it issues only a warning and keeps the current
+ working directory the same.
+
+ """
+ saved_dir = os.getcwd()
+ try:
+ os.chdir(path)
+ except OSError:
+ if not quiet:
+ raise
+ warnings.warn('tests may fail, unable to change CWD to: ' + path,
+ RuntimeWarning, stacklevel=3)
+ try:
+ yield os.getcwd()
+ finally:
+ os.chdir(saved_dir)
+
+
+@contextlib.contextmanager
+def temp_cwd(name='tempcwd', quiet=False):
+ """
+ Context manager that creates a temporary directory and set it as CWD.
+
+ The new CWD is created in the current directory and it's named *name*.
+ If *quiet* is False (default) and it's not possible to create or change
+ the CWD, an error is raised. If it's True, only a warning is raised
+ and the original CWD is used.
+ """
+ if (have_unicode and isinstance(name, unicode) and
+ not os.path.supports_unicode_filenames):
+ try:
+ name = name.encode(sys.getfilesystemencoding() or 'ascii')
+ except UnicodeEncodeError:
+ if not quiet:
+ raise unittest.SkipTest('unable to encode the cwd name with '
+ 'the filesystem encoding.')
+ saved_dir = os.getcwd()
+ is_temporary = False
+ try:
+ os.mkdir(name)
+ os.chdir(name)
+ is_temporary = True
+ except OSError:
+ if not quiet:
+ raise
+ warnings.warn('tests may fail, unable to change the CWD to ' + name,
+ RuntimeWarning, stacklevel=3)
+ try:
+ yield os.getcwd()
+ finally:
+ os.chdir(saved_dir)
+ if is_temporary:
+ rmtree(name)
+
+# TEST_HOME_DIR refers to the top level directory of the "test" package
+# that contains Python's regression test suite
+TEST_SUPPORT_DIR = os.path.dirname(os.path.abspath(__file__))
+TEST_HOME_DIR = os.path.dirname(TEST_SUPPORT_DIR)
+
+# TEST_DATA_DIR is used as a target download location for remote resources
+TEST_DATA_DIR = os.path.join(TEST_HOME_DIR, "data")
+
+def findfile(file, subdir=None):
+ """Try to find a file on sys.path and the working directory. If it is not
+ found the argument passed to the function is returned (this does not
+ necessarily signal failure; could still be the legitimate path)."""
+ if os.path.isabs(file):
+ return file
+ if subdir is not None:
+ file = os.path.join(subdir, file)
+ path = [TEST_HOME_DIR] + sys.path
+ for dn in path:
+ fn = os.path.join(dn, file)
+ if os.path.exists(fn): return fn
+ return file
+
+def sortdict(dict):
+ "Like repr(dict), but in sorted order."
+ items = dict.items()
+ items.sort()
+ reprpairs = ["%r: %r" % pair for pair in items]
+ withcommas = ", ".join(reprpairs)
+ return "{%s}" % withcommas
+
+def make_bad_fd():
+ """
+ Create an invalid file descriptor by opening and closing a file and return
+ its fd.
+ """
+ file = open(TESTFN, "wb")
+ try:
+ return file.fileno()
+ finally:
+ file.close()
+ unlink(TESTFN)
+
+def check_syntax_error(testcase, statement):
+ testcase.assertRaises(SyntaxError, compile, statement,
+ '<test string>', 'exec')
+
+def open_urlresource(url, check=None):
+ import urlparse, urllib2
+
+ filename = urlparse.urlparse(url)[2].split('/')[-1] # '/': it's URL!
+
+ fn = os.path.join(TEST_DATA_DIR, filename)
+
+ def check_valid_file(fn):
+ f = open(fn)
+ if check is None:
+ return f
+ elif check(f):
+ f.seek(0)
+ return f
+ f.close()
+
+ if os.path.exists(fn):
+ f = check_valid_file(fn)
+ if f is not None:
+ return f
+ unlink(fn)
+
+ # Verify the requirement before downloading the file
+ requires('urlfetch')
+
+ print >> get_original_stdout(), '\tfetching %s ...' % url
+ f = urllib2.urlopen(url, timeout=15)
+ try:
+ with open(fn, "wb") as out:
+ s = f.read()
+ while s:
+ out.write(s)
+ s = f.read()
+ finally:
+ f.close()
+
+ f = check_valid_file(fn)
+ if f is not None:
+ return f
+ raise TestFailed('invalid resource "%s"' % fn)
+
+
+class WarningsRecorder(object):
+ """Convenience wrapper for the warnings list returned on
+ entry to the warnings.catch_warnings() context manager.
+ """
+ def __init__(self, warnings_list):
+ self._warnings = warnings_list
+ self._last = 0
+
+ def __getattr__(self, attr):
+ if len(self._warnings) > self._last:
+ return getattr(self._warnings[-1], attr)
+ elif attr in warnings.WarningMessage._WARNING_DETAILS:
+ return None
+ raise AttributeError("%r has no attribute %r" % (self, attr))
+
+ @property
+ def warnings(self):
+ return self._warnings[self._last:]
+
+ def reset(self):
+ self._last = len(self._warnings)
+
+
+def _filterwarnings(filters, quiet=False):
+ """Catch the warnings, then check if all the expected
+ warnings have been raised and re-raise unexpected warnings.
+ If 'quiet' is True, only re-raise the unexpected warnings.
+ """
+ # Clear the warning registry of the calling module
+ # in order to re-raise the warnings.
+ frame = sys._getframe(2)
+ registry = frame.f_globals.get('__warningregistry__')
+ if registry:
+ registry.clear()
+ with warnings.catch_warnings(record=True) as w:
+ # Set filter "always" to record all warnings. Because
+ # test_warnings swap the module, we need to look up in
+ # the sys.modules dictionary.
+ sys.modules['warnings'].simplefilter("always")
+ yield WarningsRecorder(w)
+ # Filter the recorded warnings
+ reraise = [warning.message for warning in w]
+ missing = []
+ for msg, cat in filters:
+ seen = False
+ for exc in reraise[:]:
+ message = str(exc)
+ # Filter out the matching messages
+ if (re.match(msg, message, re.I) and
+ issubclass(exc.__class__, cat)):
+ seen = True
+ reraise.remove(exc)
+ if not seen and not quiet:
+ # This filter caught nothing
+ missing.append((msg, cat.__name__))
+ if reraise:
+ raise AssertionError("unhandled warning %r" % reraise[0])
+ if missing:
+ raise AssertionError("filter (%r, %s) did not catch any warning" %
+ missing[0])
+
+
+@contextlib.contextmanager
+def check_warnings(*filters, **kwargs):
+ """Context manager to silence warnings.
+
+ Accept 2-tuples as positional arguments:
+ ("message regexp", WarningCategory)
+
+ Optional argument:
+ - if 'quiet' is True, it does not fail if a filter catches nothing
+ (default True without argument,
+ default False if some filters are defined)
+
+ Without argument, it defaults to:
+ check_warnings(("", Warning), quiet=True)
+ """
+ quiet = kwargs.get('quiet')
+ if not filters:
+ filters = (("", Warning),)
+ # Preserve backward compatibility
+ if quiet is None:
+ quiet = True
+ return _filterwarnings(filters, quiet)
+
+
+@contextlib.contextmanager
+def check_py3k_warnings(*filters, **kwargs):
+ """Context manager to silence py3k warnings.
+
+ Accept 2-tuples as positional arguments:
+ ("message regexp", WarningCategory)
+
+ Optional argument:
+ - if 'quiet' is True, it does not fail if a filter catches nothing
+ (default False)
+
+ Without argument, it defaults to:
+ check_py3k_warnings(("", DeprecationWarning), quiet=False)
+ """
+ if sys.py3kwarning:
+ if not filters:
+ filters = (("", DeprecationWarning),)
+ else:
+ # It should not raise any py3k warning
+ filters = ()
+ return _filterwarnings(filters, kwargs.get('quiet'))
+
+
+class CleanImport(object):
+ """Context manager to force import to return a new module reference.
+
+ This is useful for testing module-level behaviours, such as
+ the emission of a DeprecationWarning on import.
+
+ Use like this:
+
+ with CleanImport("foo"):
+ importlib.import_module("foo") # new reference
+ """
+
+ def __init__(self, *module_names):
+ self.original_modules = sys.modules.copy()
+ for module_name in module_names:
+ if module_name in sys.modules:
+ module = sys.modules[module_name]
+ # It is possible that module_name is just an alias for
+ # another module (e.g. stub for modules renamed in 3.x).
+ # In that case, we also need delete the real module to clear
+ # the import cache.
+ if module.__name__ != module_name:
+ del sys.modules[module.__name__]
+ del sys.modules[module_name]
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, *ignore_exc):
+ sys.modules.update(self.original_modules)
+
+
+class EnvironmentVarGuard(UserDict.DictMixin):
+
+ """Class to help protect the environment variable properly. Can be used as
+ a context manager."""
+
+ def __init__(self):
+ self._environ = os.environ
+ self._changed = {}
+
+ def __getitem__(self, envvar):
+ return self._environ[envvar]
+
+ def __setitem__(self, envvar, value):
+ # Remember the initial value on the first access
+ if envvar not in self._changed:
+ self._changed[envvar] = self._environ.get(envvar)
+ self._environ[envvar] = value
+
+ def __delitem__(self, envvar):
+ # Remember the initial value on the first access
+ if envvar not in self._changed:
+ self._changed[envvar] = self._environ.get(envvar)
+ if envvar in self._environ:
+ del self._environ[envvar]
+
+ def keys(self):
+ return self._environ.keys()
+
+ def set(self, envvar, value):
+ self[envvar] = value
+
+ def unset(self, envvar):
+ del self[envvar]
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, *ignore_exc):
+ for (k, v) in self._changed.items():
+ if v is None:
+ if k in self._environ:
+ del self._environ[k]
+ else:
+ self._environ[k] = v
+ os.environ = self._environ
+
+
+class DirsOnSysPath(object):
+ """Context manager to temporarily add directories to sys.path.
+
+ This makes a copy of sys.path, appends any directories given
+ as positional arguments, then reverts sys.path to the copied
+ settings when the context ends.
+
+ Note that *all* sys.path modifications in the body of the
+ context manager, including replacement of the object,
+ will be reverted at the end of the block.
+ """
+
+ def __init__(self, *paths):
+ self.original_value = sys.path[:]
+ self.original_object = sys.path
+ sys.path.extend(paths)
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, *ignore_exc):
+ sys.path = self.original_object
+ sys.path[:] = self.original_value
+
+
+class TransientResource(object):
+
+ """Raise ResourceDenied if an exception is raised while the context manager
+ is in effect that matches the specified exception and attributes."""
+
+ def __init__(self, exc, **kwargs):
+ self.exc = exc
+ self.attrs = kwargs
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, type_=None, value=None, traceback=None):
+ """If type_ is a subclass of self.exc and value has attributes matching
+ self.attrs, raise ResourceDenied. Otherwise let the exception
+ propagate (if any)."""
+ if type_ is not None and issubclass(self.exc, type_):
+ for attr, attr_value in self.attrs.iteritems():
+ if not hasattr(value, attr):
+ break
+ if getattr(value, attr) != attr_value:
+ break
+ else:
+ raise ResourceDenied("an optional resource is not available")
+
+
+@contextlib.contextmanager
+def transient_internet(resource_name, timeout=30.0, errnos=()):
+ """Return a context manager that raises ResourceDenied when various issues
+ with the Internet connection manifest themselves as exceptions."""
+ default_errnos = [
+ ('ECONNREFUSED', 111),
+ ('ECONNRESET', 104),
+ ('EHOSTUNREACH', 113),
+ ('ENETUNREACH', 101),
+ ('ETIMEDOUT', 110),
+ ]
+ default_gai_errnos = [
+ ('EAI_AGAIN', -3),
+ ('EAI_FAIL', -4),
+ ('EAI_NONAME', -2),
+ ('EAI_NODATA', -5),
+ # Windows defines EAI_NODATA as 11001 but idiotic getaddrinfo()
+ # implementation actually returns WSANO_DATA i.e. 11004.
+ ('WSANO_DATA', 11004),
+ ]
+
+ denied = ResourceDenied("Resource '%s' is not available" % resource_name)
+ captured_errnos = errnos
+ gai_errnos = []
+ if not captured_errnos:
+ captured_errnos = [getattr(errno, name, num)
+ for (name, num) in default_errnos]
+ gai_errnos = [getattr(socket, name, num)
+ for (name, num) in default_gai_errnos]
+
+ def filter_error(err):
+ n = getattr(err, 'errno', None)
+ if (isinstance(err, socket.timeout) or
+ (isinstance(err, socket.gaierror) and n in gai_errnos) or
+ n in captured_errnos):
+ if not verbose:
+ sys.stderr.write(denied.args[0] + "\n")
+ raise denied
+
+ old_timeout = socket.getdefaulttimeout()
+ try:
+ if timeout is not None:
+ socket.setdefaulttimeout(timeout)
+ yield
+ except IOError as err:
+ # urllib can wrap original socket errors multiple times (!), we must
+ # unwrap to get at the original error.
+ while True:
+ a = err.args
+ if len(a) >= 1 and isinstance(a[0], IOError):
+ err = a[0]
+ # The error can also be wrapped as args[1]:
+ # except socket.error as msg:
+ # raise IOError('socket error', msg).with_traceback(sys.exc_info()[2])
+ elif len(a) >= 2 and isinstance(a[1], IOError):
+ err = a[1]
+ else:
+ break
+ filter_error(err)
+ raise
+ # XXX should we catch generic exceptions and look for their
+ # __cause__ or __context__?
+ finally:
+ socket.setdefaulttimeout(old_timeout)
+
+
+@contextlib.contextmanager
+def captured_output(stream_name):
+ """Return a context manager used by captured_stdout and captured_stdin
+ that temporarily replaces the sys stream *stream_name* with a StringIO."""
+ import StringIO
+ orig_stdout = getattr(sys, stream_name)
+ setattr(sys, stream_name, StringIO.StringIO())
+ try:
+ yield getattr(sys, stream_name)
+ finally:
+ setattr(sys, stream_name, orig_stdout)
+
+def captured_stdout():
+ """Capture the output of sys.stdout:
+
+ with captured_stdout() as s:
+ print "hello"
+ self.assertEqual(s.getvalue(), "hello")
+ """
+ return captured_output("stdout")
+
+def captured_stderr():
+ return captured_output("stderr")
+
+def captured_stdin():
+ return captured_output("stdin")
+
+def gc_collect():
+ """Force as many objects as possible to be collected.
+
+ In non-CPython implementations of Python, this is needed because timely
+ deallocation is not guaranteed by the garbage collector. (Even in CPython
+ this can be the case in case of reference cycles.) This means that __del__
+ methods may be called later than expected and weakrefs may remain alive for
+ longer than expected. This function tries its best to force all garbage
+ objects to disappear.
+ """
+ gc.collect()
+ if is_jython:
+ time.sleep(0.1)
+ gc.collect()
+ gc.collect()
+
+
+_header = '2P'
+if hasattr(sys, "gettotalrefcount"):
+ _header = '2P' + _header
+_vheader = _header + 'P'
+
+def calcobjsize(fmt):
+ return struct.calcsize(_header + fmt + '0P')
+
+def calcvobjsize(fmt):
+ return struct.calcsize(_vheader + fmt + '0P')
+
+
+_TPFLAGS_HAVE_GC = 1<<14
+_TPFLAGS_HEAPTYPE = 1<<9
+
+def check_sizeof(test, o, size):
+ import _testcapi
+ result = sys.getsizeof(o)
+ # add GC header size
+ if ((type(o) == type) and (o.__flags__ & _TPFLAGS_HEAPTYPE) or\
+ ((type(o) != type) and (type(o).__flags__ & _TPFLAGS_HAVE_GC))):
+ size += _testcapi.SIZEOF_PYGC_HEAD
+ msg = 'wrong size for %s: got %d, expected %d' \
+ % (type(o), result, size)
+ test.assertEqual(result, size, msg)
+
+
+#=======================================================================
+# Decorator for running a function in a different locale, correctly resetting
+# it afterwards.
+
+def run_with_locale(catstr, *locales):
+ def decorator(func):
+ def inner(*args, **kwds):
+ try:
+ import locale
+ category = getattr(locale, catstr)
+ orig_locale = locale.setlocale(category)
+ except AttributeError:
+ # if the test author gives us an invalid category string
+ raise
+ except:
+ # cannot retrieve original locale, so do nothing
+ locale = orig_locale = None
+ else:
+ for loc in locales:
+ try:
+ locale.setlocale(category, loc)
+ break
+ except:
+ pass
+
+ # now run the function, resetting the locale on exceptions
+ try:
+ return func(*args, **kwds)
+ finally:
+ if locale and orig_locale:
+ locale.setlocale(category, orig_locale)
+ inner.func_name = func.func_name
+ inner.__doc__ = func.__doc__
+ return inner
+ return decorator
+
+#=======================================================================
+# Decorator for running a function in a specific timezone, correctly
+# resetting it afterwards.
+
+def run_with_tz(tz):
+ def decorator(func):
+ def inner(*args, **kwds):
+ try:
+ tzset = time.tzset
+ except AttributeError:
+ raise unittest.SkipTest("tzset required")
+ if 'TZ' in os.environ:
+ orig_tz = os.environ['TZ']
+ else:
+ orig_tz = None
+ os.environ['TZ'] = tz
+ tzset()
+
+ # now run the function, resetting the tz on exceptions
+ try:
+ return func(*args, **kwds)
+ finally:
+ if orig_tz is None:
+ del os.environ['TZ']
+ else:
+ os.environ['TZ'] = orig_tz
+ time.tzset()
+
+ inner.__name__ = func.__name__
+ inner.__doc__ = func.__doc__
+ return inner
+ return decorator
+
+#=======================================================================
+# Big-memory-test support. Separate from 'resources' because memory use should be configurable.
+
+# Some handy shorthands. Note that these are used for byte-limits as well
+# as size-limits, in the various bigmem tests
+_1M = 1024*1024
+_1G = 1024 * _1M
+_2G = 2 * _1G
+_4G = 4 * _1G
+
+MAX_Py_ssize_t = sys.maxsize
+
+def set_memlimit(limit):
+ global max_memuse
+ global real_max_memuse
+ sizes = {
+ 'k': 1024,
+ 'm': _1M,
+ 'g': _1G,
+ 't': 1024*_1G,
+ }
+ m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit,
+ re.IGNORECASE | re.VERBOSE)
+ if m is None:
+ raise ValueError('Invalid memory limit %r' % (limit,))
+ memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()])
+ real_max_memuse = memlimit
+ if memlimit > MAX_Py_ssize_t:
+ memlimit = MAX_Py_ssize_t
+ if memlimit < _2G - 1:
+ raise ValueError('Memory limit %r too low to be useful' % (limit,))
+ max_memuse = memlimit
+
+def bigmemtest(minsize, memuse, overhead=5*_1M):
+ """Decorator for bigmem tests.
+
+ 'minsize' is the minimum useful size for the test (in arbitrary,
+ test-interpreted units.) 'memuse' is the number of 'bytes per size' for
+ the test, or a good estimate of it. 'overhead' specifies fixed overhead,
+ independent of the testsize, and defaults to 5Mb.
+
+ The decorator tries to guess a good value for 'size' and passes it to
+ the decorated test function. If minsize * memuse is more than the
+ allowed memory use (as defined by max_memuse), the test is skipped.
+ Otherwise, minsize is adjusted upward to use up to max_memuse.
+ """
+ def decorator(f):
+ def wrapper(self):
+ if not max_memuse:
+ # If max_memuse is 0 (the default),
+ # we still want to run the tests with size set to a few kb,
+ # to make sure they work. We still want to avoid using
+ # too much memory, though, but we do that noisily.
+ maxsize = 5147
+ self.assertFalse(maxsize * memuse + overhead > 20 * _1M)
+ else:
+ maxsize = int((max_memuse - overhead) / memuse)
+ if maxsize < minsize:
+ # Really ought to print 'test skipped' or something
+ if verbose:
+ sys.stderr.write("Skipping %s because of memory "
+ "constraint\n" % (f.__name__,))
+ return
+ # Try to keep some breathing room in memory use
+ maxsize = max(maxsize - 50 * _1M, minsize)
+ return f(self, maxsize)
+ wrapper.minsize = minsize
+ wrapper.memuse = memuse
+ wrapper.overhead = overhead
+ return wrapper
+ return decorator
+
+def precisionbigmemtest(size, memuse, overhead=5*_1M, dry_run=True):
+ def decorator(f):
+ def wrapper(self):
+ if not real_max_memuse:
+ maxsize = 5147
+ else:
+ maxsize = size
+
+ if ((real_max_memuse or not dry_run)
+ and real_max_memuse < maxsize * memuse):
+ if verbose:
+ sys.stderr.write("Skipping %s because of memory "
+ "constraint\n" % (f.__name__,))
+ return
+
+ return f(self, maxsize)
+ wrapper.size = size
+ wrapper.memuse = memuse
+ wrapper.overhead = overhead
+ return wrapper
+ return decorator
+
+def bigaddrspacetest(f):
+ """Decorator for tests that fill the address space."""
+ def wrapper(self):
+ if max_memuse < MAX_Py_ssize_t:
+ if verbose:
+ sys.stderr.write("Skipping %s because of memory "
+ "constraint\n" % (f.__name__,))
+ else:
+ return f(self)
+ return wrapper
+
+#=======================================================================
+# unittest integration.
+
+class BasicTestRunner:
+ def run(self, test):
+ result = unittest.TestResult()
+ test(result)
+ return result
+
+def _id(obj):
+ return obj
+
+def requires_resource(resource):
+ if resource == 'gui' and not _is_gui_available():
+ return unittest.skip(_is_gui_available.reason)
+ if is_resource_enabled(resource):
+ return _id
+ else:
+ return unittest.skip("resource {0!r} is not enabled".format(resource))
+
+def cpython_only(test):
+ """
+ Decorator for tests only applicable on CPython.
+ """
+ return impl_detail(cpython=True)(test)
+
+def impl_detail(msg=None, **guards):
+ if check_impl_detail(**guards):
+ return _id
+ if msg is None:
+ guardnames, default = _parse_guards(guards)
+ if default:
+ msg = "implementation detail not available on {0}"
+ else:
+ msg = "implementation detail specific to {0}"
+ guardnames = sorted(guardnames.keys())
+ msg = msg.format(' or '.join(guardnames))
+ return unittest.skip(msg)
+
+def _parse_guards(guards):
+ # Returns a tuple ({platform_name: run_me}, default_value)
+ if not guards:
+ return ({'cpython': True}, False)
+ is_true = guards.values()[0]
+ assert guards.values() == [is_true] * len(guards) # all True or all False
+ return (guards, not is_true)
+
+# Use the following check to guard CPython's implementation-specific tests --
+# or to run them only on the implementation(s) guarded by the arguments.
+def check_impl_detail(**guards):
+ """This function returns True or False depending on the host platform.
+ Examples:
+ if check_impl_detail(): # only on CPython (default)
+ if check_impl_detail(jython=True): # only on Jython
+ if check_impl_detail(cpython=False): # everywhere except on CPython
+ """
+ guards, default = _parse_guards(guards)
+ return guards.get(platform.python_implementation().lower(), default)
+
+
+
+def _run_suite(suite):
+ """Run tests from a unittest.TestSuite-derived class."""
+ if verbose:
+ runner = unittest.TextTestRunner(sys.stdout, verbosity=2)
+ else:
+ runner = BasicTestRunner()
+
+ result = runner.run(suite)
+ if not result.wasSuccessful():
+ if len(result.errors) == 1 and not result.failures:
+ err = result.errors[0][1]
+ elif len(result.failures) == 1 and not result.errors:
+ err = result.failures[0][1]
+ else:
+ err = "multiple errors occurred"
+ if not verbose:
+ err += "; run in verbose mode for details"
+ raise TestFailed(err)
+
+
+def run_unittest(*classes):
+ """Run tests from unittest.TestCase-derived classes."""
+ valid_types = (unittest.TestSuite, unittest.TestCase)
+ suite = unittest.TestSuite()
+ for cls in classes:
+ if isinstance(cls, str):
+ if cls in sys.modules:
+ suite.addTest(unittest.findTestCases(sys.modules[cls]))
+ else:
+ raise ValueError("str arguments must be keys in sys.modules")
+ elif isinstance(cls, valid_types):
+ suite.addTest(cls)
+ else:
+ suite.addTest(unittest.makeSuite(cls))
+ _run_suite(suite)
+
+#=======================================================================
+# Check for the presence of docstrings.
+
+HAVE_DOCSTRINGS = (check_impl_detail(cpython=False) or
+ sys.platform == 'win32' or
+ sysconfig.get_config_var('WITH_DOC_STRINGS'))
+
+requires_docstrings = unittest.skipUnless(HAVE_DOCSTRINGS,
+ "test requires docstrings")
+
+
+#=======================================================================
+# doctest driver.
+
+def run_doctest(module, verbosity=None):
+ """Run doctest on the given module. Return (#failures, #tests).
+
+ If optional argument verbosity is not specified (or is None), pass
+ test.support's belief about verbosity on to doctest. Else doctest's
+ usual behavior is used (it searches sys.argv for -v).
+ """
+
+ import doctest
+
+ if verbosity is None:
+ verbosity = verbose
+ else:
+ verbosity = None
+
+ # Direct doctest output (normally just errors) to real stdout; doctest
+ # output shouldn't be compared by regrtest.
+ save_stdout = sys.stdout
+ sys.stdout = get_original_stdout()
+ try:
+ f, t = doctest.testmod(module, verbose=verbosity)
+ if f:
+ raise TestFailed("%d of %d doctests failed" % (f, t))
+ finally:
+ sys.stdout = save_stdout
+ if verbose:
+ print 'doctest (%s) ... %d tests with zero failures' % (module.__name__, t)
+ return f, t
+
+#=======================================================================
+# Threading support to prevent reporting refleaks when running regrtest.py -R
+
+# NOTE: we use thread._count() rather than threading.enumerate() (or the
+# moral equivalent thereof) because a threading.Thread object is still alive
+# until its __bootstrap() method has returned, even after it has been
+# unregistered from the threading module.
+# thread._count(), on the other hand, only gets decremented *after* the
+# __bootstrap() method has returned, which gives us reliable reference counts
+# at the end of a test run.
+
+def threading_setup():
+ if thread:
+ return thread._count(),
+ else:
+ return 1,
+
+def threading_cleanup(nb_threads):
+ if not thread:
+ return
+
+ _MAX_COUNT = 10
+ for count in range(_MAX_COUNT):
+ n = thread._count()
+ if n == nb_threads:
+ break
+ time.sleep(0.1)
+ # XXX print a warning in case of failure?
+
+def reap_threads(func):
+ """Use this function when threads are being used. This will
+ ensure that the threads are cleaned up even when the test fails.
+ If threading is unavailable this function does nothing.
+ """
+ if not thread:
+ return func
+
+ @functools.wraps(func)
+ def decorator(*args):
+ key = threading_setup()
+ try:
+ return func(*args)
+ finally:
+ threading_cleanup(*key)
+ return decorator
+
+def reap_children():
+ """Use this function at the end of test_main() whenever sub-processes
+ are started. This will help ensure that no extra children (zombies)
+ stick around to hog resources and create problems when looking
+ for refleaks.
+ """
+
+ # Reap all our dead child processes so we don't leave zombies around.
+ # These hog resources and might be causing some of the buildbots to die.
+ if hasattr(os, 'waitpid'):
+ any_process = -1
+ while True:
+ try:
+ # This will raise an exception on Windows. That's ok.
+ pid, status = os.waitpid(any_process, os.WNOHANG)
+ if pid == 0:
+ break
+ except:
+ break
+
+@contextlib.contextmanager
+def start_threads(threads, unlock=None):
+ threads = list(threads)
+ started = []
+ try:
+ try:
+ for t in threads:
+ t.start()
+ started.append(t)
+ except:
+ if verbose:
+ print("Can't start %d threads, only %d threads started" %
+ (len(threads), len(started)))
+ raise
+ yield
+ finally:
+ if unlock:
+ unlock()
+ endtime = starttime = time.time()
+ for timeout in range(1, 16):
+ endtime += 60
+ for t in started:
+ t.join(max(endtime - time.time(), 0.01))
+ started = [t for t in started if t.isAlive()]
+ if not started:
+ break
+ if verbose:
+ print('Unable to join %d threads during a period of '
+ '%d minutes' % (len(started), timeout))
+ started = [t for t in started if t.isAlive()]
+ if started:
+ raise AssertionError('Unable to join %d threads' % len(started))
+
+@contextlib.contextmanager
+def swap_attr(obj, attr, new_val):
+ """Temporary swap out an attribute with a new object.
+
+ Usage:
+ with swap_attr(obj, "attr", 5):
+ ...
+
+ This will set obj.attr to 5 for the duration of the with: block,
+ restoring the old value at the end of the block. If `attr` doesn't
+ exist on `obj`, it will be created and then deleted at the end of the
+ block.
+
+ The old value (or None if it doesn't exist) will be assigned to the
+ target of the "as" clause, if there is one.
+ """
+ if hasattr(obj, attr):
+ real_val = getattr(obj, attr)
+ setattr(obj, attr, new_val)
+ try:
+ yield real_val
+ finally:
+ setattr(obj, attr, real_val)
+ else:
+ setattr(obj, attr, new_val)
+ try:
+ yield
+ finally:
+ if hasattr(obj, attr):
+ delattr(obj, attr)
+
+@contextlib.contextmanager
+def swap_item(obj, item, new_val):
+ """Temporary swap out an item with a new object.
+
+ Usage:
+ with swap_item(obj, "item", 5):
+ ...
+
+ This will set obj["item"] to 5 for the duration of the with: block,
+ restoring the old value at the end of the block. If `item` doesn't
+ exist on `obj`, it will be created and then deleted at the end of the
+ block.
+
+ The old value (or None if it doesn't exist) will be assigned to the
+ target of the "as" clause, if there is one.
+ """
+ if item in obj:
+ real_val = obj[item]
+ obj[item] = new_val
+ try:
+ yield real_val
+ finally:
+ obj[item] = real_val
+ else:
+ obj[item] = new_val
+ try:
+ yield
+ finally:
+ if item in obj:
+ del obj[item]
+
+def py3k_bytes(b):
+ """Emulate the py3k bytes() constructor.
+
+ NOTE: This is only a best effort function.
+ """
+ try:
+ # memoryview?
+ return b.tobytes()
+ except AttributeError:
+ try:
+ # iterable of ints?
+ return b"".join(chr(x) for x in b)
+ except TypeError:
+ return bytes(b)
+
+def args_from_interpreter_flags():
+ """Return a list of command-line arguments reproducing the current
+ settings in sys.flags."""
+ import subprocess
+ return subprocess._args_from_interpreter_flags()
+
+def strip_python_stderr(stderr):
+ """Strip the stderr of a Python process from potential debug output
+ emitted by the interpreter.
+
+ This will typically be run on the result of the communicate() method
+ of a subprocess.Popen object.
+ """
+ stderr = re.sub(br"\[\d+ refs\]\r?\n?$", b"", stderr).strip()
+ return stderr
+
+
+def check_free_after_iterating(test, iter, cls, args=()):
+ class A(cls):
+ def __del__(self):
+ done[0] = True
+ try:
+ next(it)
+ except StopIteration:
+ pass
+
+ done = [False]
+ it = iter(A(*args))
+ # Issue 26494: Shouldn't crash
+ test.assertRaises(StopIteration, next, it)
+ # The sequence should be deallocated just after the end of iterating
+ gc_collect()
+ test.assertTrue(done[0])
+
+@contextlib.contextmanager
+def disable_gc():
+ have_gc = gc.isenabled()
+ gc.disable()
+ try:
+ yield
+ finally:
+ if have_gc:
+ gc.enable()
--- /dev/null
+# Common utility functions used by various script execution tests
+# e.g. test_cmd_line, test_cmd_line_script and test_runpy
+
+import sys
+import os
+import re
+import os.path
+import tempfile
+import subprocess
+import py_compile
+import contextlib
+import shutil
+try:
+ import zipfile
+except ImportError:
+ # If Python is build without Unicode support, importing _io will
+ # fail, which, in turn, means that zipfile cannot be imported
+ # Most of this module can then still be used.
+ pass
+
+from test.support import strip_python_stderr
+
+# Executing the interpreter in a subprocess
+def _assert_python(expected_success, *args, **env_vars):
+ cmd_line = [sys.executable]
+ if not env_vars:
+ cmd_line.append('-E')
+ cmd_line.extend(args)
+ # Need to preserve the original environment, for in-place testing of
+ # shared library builds.
+ env = os.environ.copy()
+ env.update(env_vars)
+ p = subprocess.Popen(cmd_line, stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE,
+ env=env)
+ try:
+ out, err = p.communicate()
+ finally:
+ subprocess._cleanup()
+ p.stdout.close()
+ p.stderr.close()
+ rc = p.returncode
+ err = strip_python_stderr(err)
+ if (rc and expected_success) or (not rc and not expected_success):
+ raise AssertionError(
+ "Process return code is %d, "
+ "stderr follows:\n%s" % (rc, err.decode('ascii', 'ignore')))
+ return rc, out, err
+
+def assert_python_ok(*args, **env_vars):
+ """
+ Assert that running the interpreter with `args` and optional environment
+ variables `env_vars` is ok and return a (return code, stdout, stderr) tuple.
+ """
+ return _assert_python(True, *args, **env_vars)
+
+def assert_python_failure(*args, **env_vars):
+ """
+ Assert that running the interpreter with `args` and optional environment
+ variables `env_vars` fails and return a (return code, stdout, stderr) tuple.
+ """
+ return _assert_python(False, *args, **env_vars)
+
+def python_exit_code(*args):
+ cmd_line = [sys.executable, '-E']
+ cmd_line.extend(args)
+ with open(os.devnull, 'w') as devnull:
+ return subprocess.call(cmd_line, stdout=devnull,
+ stderr=subprocess.STDOUT)
+
+def spawn_python(*args, **kwargs):
+ cmd_line = [sys.executable, '-E']
+ cmd_line.extend(args)
+ return subprocess.Popen(cmd_line, stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
+ **kwargs)
+
+def kill_python(p):
+ p.stdin.close()
+ data = p.stdout.read()
+ p.stdout.close()
+ # try to cleanup the child so we don't appear to leak when running
+ # with regrtest -R.
+ p.wait()
+ subprocess._cleanup()
+ return data
+
+def run_python(*args, **kwargs):
+ if __debug__:
+ p = spawn_python(*args, **kwargs)
+ else:
+ p = spawn_python('-O', *args, **kwargs)
+ stdout_data = kill_python(p)
+ return p.wait(), stdout_data
+
+# Script creation utilities
+@contextlib.contextmanager
+def temp_dir():
+ dirname = tempfile.mkdtemp()
+ dirname = os.path.realpath(dirname)
+ try:
+ yield dirname
+ finally:
+ shutil.rmtree(dirname)
+
+def make_script(script_dir, script_basename, source):
+ script_filename = script_basename+os.extsep+'py'
+ script_name = os.path.join(script_dir, script_filename)
+ script_file = open(script_name, 'w')
+ script_file.write(source)
+ script_file.close()
+ return script_name
+
+def compile_script(script_name):
+ py_compile.compile(script_name, doraise=True)
+ if __debug__:
+ compiled_name = script_name + 'c'
+ else:
+ compiled_name = script_name + 'o'
+ return compiled_name
+
+def make_zip_script(zip_dir, zip_basename, script_name, name_in_zip=None):
+ zip_filename = zip_basename+os.extsep+'zip'
+ zip_name = os.path.join(zip_dir, zip_filename)
+ zip_file = zipfile.ZipFile(zip_name, 'w')
+ if name_in_zip is None:
+ name_in_zip = os.path.basename(script_name)
+ zip_file.write(script_name, name_in_zip)
+ zip_file.close()
+ #if test.test_support.verbose:
+ # zip_file = zipfile.ZipFile(zip_name, 'r')
+ # print 'Contents of %r:' % zip_name
+ # zip_file.printdir()
+ # zip_file.close()
+ return zip_name, os.path.join(zip_name, name_in_zip)
+
+def make_pkg(pkg_dir, init_source=''):
+ os.mkdir(pkg_dir)
+ make_script(pkg_dir, '__init__', init_source)
+
+def make_zip_pkg(zip_dir, zip_basename, pkg_name, script_basename,
+ source, depth=1, compiled=False):
+ unlink = []
+ init_name = make_script(zip_dir, '__init__', '')
+ unlink.append(init_name)
+ init_basename = os.path.basename(init_name)
+ script_name = make_script(zip_dir, script_basename, source)
+ unlink.append(script_name)
+ if compiled:
+ init_name = compile_script(init_name)
+ script_name = compile_script(script_name)
+ unlink.extend((init_name, script_name))
+ pkg_names = [os.sep.join([pkg_name]*i) for i in range(1, depth+1)]
+ script_name_in_zip = os.path.join(pkg_names[-1], os.path.basename(script_name))
+ zip_filename = zip_basename+os.extsep+'zip'
+ zip_name = os.path.join(zip_dir, zip_filename)
+ zip_file = zipfile.ZipFile(zip_name, 'w')
+ for name in pkg_names:
+ init_name_in_zip = os.path.join(name, init_basename)
+ zip_file.write(init_name, init_name_in_zip)
+ zip_file.write(script_name, script_name_in_zip)
+ zip_file.close()
+ for name in unlink:
+ os.unlink(name)
+ #if test.test_support.verbose:
+ # zip_file = zipfile.ZipFile(zip_name, 'r')
+ # print 'Contents of %r:' % zip_name
+ # zip_file.printdir()
+ # zip_file.close()
+ return zip_name, os.path.join(zip_name, script_name_in_zip)
# warning: if 'os' or 'test_support' are moved in some other dir,
# they should be changed here.
libdir = os.path.dirname(os.__file__)
- testdir = os.path.dirname(test.test_support.__file__)
+ testdir = test.test_support.TEST_HOME_DIR
for dir in [testdir]:
for basename in "test_os.py",:
import linecache
import unittest
import os.path
-from test import test_support as support
+from test import support
FILENAME = linecache.__file__
EMPTY = ''
TESTS = 'inspect_fodder inspect_fodder2 mapping_tests'
TESTS = TESTS.split()
-TEST_PATH = os.path.dirname(support.__file__)
+TEST_PATH = support.TEST_HOME_DIR
MODULES = "linecache abc".split()
MODULE_PATH = os.path.dirname(FILENAME)
-"""Supporting definitions for the Python regression tests."""
-
-if __name__ != 'test.test_support':
- raise ImportError('test_support must be imported from the test package')
-
-import contextlib
-import errno
-import functools
-import gc
-import socket
-import stat
import sys
-import os
-import platform
-import shutil
-import warnings
-import unittest
-import importlib
-import UserDict
-import re
-import time
-import struct
-import sysconfig
-try:
- import thread
-except ImportError:
- thread = None
-
-__all__ = ["Error", "TestFailed", "ResourceDenied", "import_module",
- "verbose", "use_resources", "max_memuse", "record_original_stdout",
- "get_original_stdout", "unload", "unlink", "rmtree", "forget",
- "is_resource_enabled", "requires", "requires_mac_ver",
- "find_unused_port", "bind_port",
- "fcmp", "have_unicode", "is_jython", "TESTFN", "HOST", "FUZZ",
- "SAVEDCWD", "temp_cwd", "findfile", "sortdict", "check_syntax_error",
- "open_urlresource", "check_warnings", "check_py3k_warnings",
- "CleanImport", "EnvironmentVarGuard", "captured_output",
- "captured_stdout", "TransientResource", "transient_internet",
- "run_with_locale", "set_memlimit", "bigmemtest", "bigaddrspacetest",
- "BasicTestRunner", "run_unittest", "run_doctest", "threading_setup",
- "threading_cleanup", "reap_threads", "start_threads", "cpython_only",
- "check_impl_detail", "get_attribute", "py3k_bytes",
- "import_fresh_module", "threading_cleanup", "reap_children",
- "strip_python_stderr", "IPV6_ENABLED", "run_with_tz"]
-
-class Error(Exception):
- """Base class for regression test exceptions."""
-
-class TestFailed(Error):
- """Test failed."""
-
-class ResourceDenied(unittest.SkipTest):
- """Test skipped because it requested a disallowed resource.
-
- This is raised when a test calls requires() for a resource that
- has not been enabled. It is used to distinguish between expected
- and unexpected skips.
- """
-
-@contextlib.contextmanager
-def _ignore_deprecated_imports(ignore=True):
- """Context manager to suppress package and module deprecation
- warnings when importing them.
-
- If ignore is False, this context manager has no effect."""
- if ignore:
- with warnings.catch_warnings():
- warnings.filterwarnings("ignore", ".+ (module|package)",
- DeprecationWarning)
- yield
- else:
- yield
-
-
-def import_module(name, deprecated=False):
- """Import and return the module to be tested, raising SkipTest if
- it is not available.
-
- If deprecated is True, any module or package deprecation messages
- will be suppressed."""
- with _ignore_deprecated_imports(deprecated):
- try:
- return importlib.import_module(name)
- except ImportError, msg:
- raise unittest.SkipTest(str(msg))
-
-
-def _save_and_remove_module(name, orig_modules):
- """Helper function to save and remove a module from sys.modules
-
- Raise ImportError if the module can't be imported."""
- # try to import the module and raise an error if it can't be imported
- if name not in sys.modules:
- __import__(name)
- del sys.modules[name]
- for modname in list(sys.modules):
- if modname == name or modname.startswith(name + '.'):
- orig_modules[modname] = sys.modules[modname]
- del sys.modules[modname]
-
-def _save_and_block_module(name, orig_modules):
- """Helper function to save and block a module in sys.modules
-
- Return True if the module was in sys.modules, False otherwise."""
- saved = True
- try:
- orig_modules[name] = sys.modules[name]
- except KeyError:
- saved = False
- sys.modules[name] = None
- return saved
-
-
-def import_fresh_module(name, fresh=(), blocked=(), deprecated=False):
- """Imports and returns a module, deliberately bypassing the sys.modules cache
- and importing a fresh copy of the module. Once the import is complete,
- the sys.modules cache is restored to its original state.
-
- Modules named in fresh are also imported anew if needed by the import.
- If one of these modules can't be imported, None is returned.
-
- Importing of modules named in blocked is prevented while the fresh import
- takes place.
-
- If deprecated is True, any module or package deprecation messages
- will be suppressed."""
- # NOTE: test_heapq, test_json, and test_warnings include extra sanity
- # checks to make sure that this utility function is working as expected
- with _ignore_deprecated_imports(deprecated):
- # Keep track of modules saved for later restoration as well
- # as those which just need a blocking entry removed
- orig_modules = {}
- names_to_remove = []
- _save_and_remove_module(name, orig_modules)
- try:
- for fresh_name in fresh:
- _save_and_remove_module(fresh_name, orig_modules)
- for blocked_name in blocked:
- if not _save_and_block_module(blocked_name, orig_modules):
- names_to_remove.append(blocked_name)
- fresh_module = importlib.import_module(name)
- except ImportError:
- fresh_module = None
- finally:
- for orig_name, module in orig_modules.items():
- sys.modules[orig_name] = module
- for name_to_remove in names_to_remove:
- del sys.modules[name_to_remove]
- return fresh_module
-
-
-def get_attribute(obj, name):
- """Get an attribute, raising SkipTest if AttributeError is raised."""
- try:
- attribute = getattr(obj, name)
- except AttributeError:
- raise unittest.SkipTest("module %s has no attribute %s" % (
- obj.__name__, name))
- else:
- return attribute
-
-
-verbose = 1 # Flag set to 0 by regrtest.py
-use_resources = None # Flag set to [] by regrtest.py
-max_memuse = 0 # Disable bigmem tests (they will still be run with
- # small sizes, to make sure they work.)
-real_max_memuse = 0
-
-# _original_stdout is meant to hold stdout at the time regrtest began.
-# This may be "the real" stdout, or IDLE's emulation of stdout, or whatever.
-# The point is to have some flavor of stdout the user can actually see.
-_original_stdout = None
-def record_original_stdout(stdout):
- global _original_stdout
- _original_stdout = stdout
-
-def get_original_stdout():
- return _original_stdout or sys.stdout
-
-def unload(name):
- try:
- del sys.modules[name]
- except KeyError:
- pass
-
-def _force_run(path, func, *args):
- try:
- return func(*args)
- except EnvironmentError as err:
- if verbose >= 2:
- print('%s: %s' % (err.__class__.__name__, err))
- print('re-run %s%r' % (func.__name__, args))
- os.chmod(path, stat.S_IRWXU)
- return func(*args)
-
-if sys.platform.startswith("win"):
- def _waitfor(func, pathname, waitall=False):
- # Perform the operation
- func(pathname)
- # Now setup the wait loop
- if waitall:
- dirname = pathname
- else:
- dirname, name = os.path.split(pathname)
- dirname = dirname or '.'
- # Check for `pathname` to be removed from the filesystem.
- # The exponential backoff of the timeout amounts to a total
- # of ~1 second after which the deletion is probably an error
- # anyway.
- # Testing on an i7@4.3GHz shows that usually only 1 iteration is
- # required when contention occurs.
- timeout = 0.001
- while timeout < 1.0:
- # Note we are only testing for the existence of the file(s) in
- # the contents of the directory regardless of any security or
- # access rights. If we have made it this far, we have sufficient
- # permissions to do that much using Python's equivalent of the
- # Windows API FindFirstFile.
- # Other Windows APIs can fail or give incorrect results when
- # dealing with files that are pending deletion.
- L = os.listdir(dirname)
- if not (L if waitall else name in L):
- return
- # Increase the timeout and try again
- time.sleep(timeout)
- timeout *= 2
- warnings.warn('tests may fail, delete still pending for ' + pathname,
- RuntimeWarning, stacklevel=4)
-
- def _unlink(filename):
- _waitfor(os.unlink, filename)
-
- def _rmdir(dirname):
- _waitfor(os.rmdir, dirname)
-
- def _rmtree(path):
- def _rmtree_inner(path):
- for name in _force_run(path, os.listdir, path):
- fullname = os.path.join(path, name)
- if os.path.isdir(fullname):
- _waitfor(_rmtree_inner, fullname, waitall=True)
- _force_run(fullname, os.rmdir, fullname)
- else:
- _force_run(fullname, os.unlink, fullname)
- _waitfor(_rmtree_inner, path, waitall=True)
- _waitfor(lambda p: _force_run(p, os.rmdir, p), path)
-else:
- _unlink = os.unlink
- _rmdir = os.rmdir
-
- def _rmtree(path):
- try:
- shutil.rmtree(path)
- return
- except EnvironmentError:
- pass
-
- def _rmtree_inner(path):
- for name in _force_run(path, os.listdir, path):
- fullname = os.path.join(path, name)
- try:
- mode = os.lstat(fullname).st_mode
- except EnvironmentError:
- mode = 0
- if stat.S_ISDIR(mode):
- _rmtree_inner(fullname)
- _force_run(path, os.rmdir, fullname)
- else:
- _force_run(path, os.unlink, fullname)
- _rmtree_inner(path)
- os.rmdir(path)
-
-def unlink(filename):
- try:
- _unlink(filename)
- except OSError:
- pass
-
-def rmdir(dirname):
- try:
- _rmdir(dirname)
- except OSError as error:
- # The directory need not exist.
- if error.errno != errno.ENOENT:
- raise
-
-def rmtree(path):
- try:
- _rmtree(path)
- except OSError, e:
- # Unix returns ENOENT, Windows returns ESRCH.
- if e.errno not in (errno.ENOENT, errno.ESRCH):
- raise
-
-def forget(modname):
- '''"Forget" a module was ever imported by removing it from sys.modules and
- deleting any .pyc and .pyo files.'''
- unload(modname)
- for dirname in sys.path:
- unlink(os.path.join(dirname, modname + os.extsep + 'pyc'))
- # Deleting the .pyo file cannot be within the 'try' for the .pyc since
- # the chance exists that there is no .pyc (and thus the 'try' statement
- # is exited) but there is a .pyo file.
- unlink(os.path.join(dirname, modname + os.extsep + 'pyo'))
-
-# Check whether a gui is actually available
-def _is_gui_available():
- if hasattr(_is_gui_available, 'result'):
- return _is_gui_available.result
- reason = None
- if sys.platform.startswith('win'):
- # if Python is running as a service (such as the buildbot service),
- # gui interaction may be disallowed
- import ctypes
- import ctypes.wintypes
- UOI_FLAGS = 1
- WSF_VISIBLE = 0x0001
- class USEROBJECTFLAGS(ctypes.Structure):
- _fields_ = [("fInherit", ctypes.wintypes.BOOL),
- ("fReserved", ctypes.wintypes.BOOL),
- ("dwFlags", ctypes.wintypes.DWORD)]
- dll = ctypes.windll.user32
- h = dll.GetProcessWindowStation()
- if not h:
- raise ctypes.WinError()
- uof = USEROBJECTFLAGS()
- needed = ctypes.wintypes.DWORD()
- res = dll.GetUserObjectInformationW(h,
- UOI_FLAGS,
- ctypes.byref(uof),
- ctypes.sizeof(uof),
- ctypes.byref(needed))
- if not res:
- raise ctypes.WinError()
- if not bool(uof.dwFlags & WSF_VISIBLE):
- reason = "gui not available (WSF_VISIBLE flag not set)"
- elif sys.platform == 'darwin':
- # The Aqua Tk implementations on OS X can abort the process if
- # being called in an environment where a window server connection
- # cannot be made, for instance when invoked by a buildbot or ssh
- # process not running under the same user id as the current console
- # user. To avoid that, raise an exception if the window manager
- # connection is not available.
- from ctypes import cdll, c_int, pointer, Structure
- from ctypes.util import find_library
-
- app_services = cdll.LoadLibrary(find_library("ApplicationServices"))
-
- if app_services.CGMainDisplayID() == 0:
- reason = "gui tests cannot run without OS X window manager"
- else:
- class ProcessSerialNumber(Structure):
- _fields_ = [("highLongOfPSN", c_int),
- ("lowLongOfPSN", c_int)]
- psn = ProcessSerialNumber()
- psn_p = pointer(psn)
- if ( (app_services.GetCurrentProcess(psn_p) < 0) or
- (app_services.SetFrontProcess(psn_p) < 0) ):
- reason = "cannot run without OS X gui process"
-
- # check on every platform whether tkinter can actually do anything
- if not reason:
- try:
- from Tkinter import Tk
- root = Tk()
- root.withdraw()
- root.update()
- root.destroy()
- except Exception as e:
- err_string = str(e)
- if len(err_string) > 50:
- err_string = err_string[:50] + ' [...]'
- reason = 'Tk unavailable due to {}: {}'.format(type(e).__name__,
- err_string)
-
- _is_gui_available.reason = reason
- _is_gui_available.result = not reason
-
- return _is_gui_available.result
-
-def is_resource_enabled(resource):
- """Test whether a resource is enabled.
-
- Known resources are set by regrtest.py. If not running under regrtest.py,
- all resources are assumed enabled unless use_resources has been set.
- """
- return use_resources is None or resource in use_resources
-
-def requires(resource, msg=None):
- """Raise ResourceDenied if the specified resource is not available."""
- if not is_resource_enabled(resource):
- if msg is None:
- msg = "Use of the `%s' resource not enabled" % resource
- raise ResourceDenied(msg)
- if resource == 'gui' and not _is_gui_available():
- raise ResourceDenied(_is_gui_available.reason)
-
-def requires_mac_ver(*min_version):
- """Decorator raising SkipTest if the OS is Mac OS X and the OS X
- version if less than min_version.
-
- For example, @requires_mac_ver(10, 5) raises SkipTest if the OS X version
- is lesser than 10.5.
- """
- def decorator(func):
- @functools.wraps(func)
- def wrapper(*args, **kw):
- if sys.platform == 'darwin':
- version_txt = platform.mac_ver()[0]
- try:
- version = tuple(map(int, version_txt.split('.')))
- except ValueError:
- pass
- else:
- if version < min_version:
- min_version_txt = '.'.join(map(str, min_version))
- raise unittest.SkipTest(
- "Mac OS X %s or higher required, not %s"
- % (min_version_txt, version_txt))
- return func(*args, **kw)
- wrapper.min_version = min_version
- return wrapper
- return decorator
-
-
-# Don't use "localhost", since resolving it uses the DNS under recent
-# Windows versions (see issue #18792).
-HOST = "127.0.0.1"
-HOSTv6 = "::1"
-
-
-def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM):
- """Returns an unused port that should be suitable for binding. This is
- achieved by creating a temporary socket with the same family and type as
- the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to
- the specified host address (defaults to 0.0.0.0) with the port set to 0,
- eliciting an unused ephemeral port from the OS. The temporary socket is
- then closed and deleted, and the ephemeral port is returned.
-
- Either this method or bind_port() should be used for any tests where a
- server socket needs to be bound to a particular port for the duration of
- the test. Which one to use depends on whether the calling code is creating
- a python socket, or if an unused port needs to be provided in a constructor
- or passed to an external program (i.e. the -accept argument to openssl's
- s_server mode). Always prefer bind_port() over find_unused_port() where
- possible. Hard coded ports should *NEVER* be used. As soon as a server
- socket is bound to a hard coded port, the ability to run multiple instances
- of the test simultaneously on the same host is compromised, which makes the
- test a ticking time bomb in a buildbot environment. On Unix buildbots, this
- may simply manifest as a failed test, which can be recovered from without
- intervention in most cases, but on Windows, the entire python process can
- completely and utterly wedge, requiring someone to log in to the buildbot
- and manually kill the affected process.
-
- (This is easy to reproduce on Windows, unfortunately, and can be traced to
- the SO_REUSEADDR socket option having different semantics on Windows versus
- Unix/Linux. On Unix, you can't have two AF_INET SOCK_STREAM sockets bind,
- listen and then accept connections on identical host/ports. An EADDRINUSE
- socket.error will be raised at some point (depending on the platform and
- the order bind and listen were called on each socket).
-
- However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE
- will ever be raised when attempting to bind two identical host/ports. When
- accept() is called on each socket, the second caller's process will steal
- the port from the first caller, leaving them both in an awkwardly wedged
- state where they'll no longer respond to any signals or graceful kills, and
- must be forcibly killed via OpenProcess()/TerminateProcess().
-
- The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option
- instead of SO_REUSEADDR, which effectively affords the same semantics as
- SO_REUSEADDR on Unix. Given the propensity of Unix developers in the Open
- Source world compared to Windows ones, this is a common mistake. A quick
- look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when
- openssl.exe is called with the 's_server' option, for example. See
- http://bugs.python.org/issue2550 for more info. The following site also
- has a very thorough description about the implications of both REUSEADDR
- and EXCLUSIVEADDRUSE on Windows:
- http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx)
-
- XXX: although this approach is a vast improvement on previous attempts to
- elicit unused ports, it rests heavily on the assumption that the ephemeral
- port returned to us by the OS won't immediately be dished back out to some
- other process when we close and delete our temporary socket but before our
- calling code has a chance to bind the returned port. We can deal with this
- issue if/when we come across it."""
- tempsock = socket.socket(family, socktype)
- port = bind_port(tempsock)
- tempsock.close()
- del tempsock
- return port
-
-def bind_port(sock, host=HOST):
- """Bind the socket to a free port and return the port number. Relies on
- ephemeral ports in order to ensure we are using an unbound port. This is
- important as many tests may be running simultaneously, especially in a
- buildbot environment. This method raises an exception if the sock.family
- is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
- or SO_REUSEPORT set on it. Tests should *never* set these socket options
- for TCP/IP sockets. The only case for setting these options is testing
- multicasting via multiple UDP sockets.
-
- Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
- on Windows), it will be set on the socket. This will prevent anyone else
- from bind()'ing to our host/port for the duration of the test.
- """
- if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
- if hasattr(socket, 'SO_REUSEADDR'):
- if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
- raise TestFailed("tests should never set the SO_REUSEADDR " \
- "socket option on TCP/IP sockets!")
- if hasattr(socket, 'SO_REUSEPORT'):
- try:
- if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
- raise TestFailed("tests should never set the SO_REUSEPORT " \
- "socket option on TCP/IP sockets!")
- except EnvironmentError:
- # Python's socket module was compiled using modern headers
- # thus defining SO_REUSEPORT but this process is running
- # under an older kernel that does not support SO_REUSEPORT.
- pass
- if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
-
- sock.bind((host, 0))
- port = sock.getsockname()[1]
- return port
-
-def _is_ipv6_enabled():
- """Check whether IPv6 is enabled on this host."""
- if socket.has_ipv6:
- sock = None
- try:
- sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
- sock.bind((HOSTv6, 0))
- return True
- except socket.error:
- pass
- finally:
- if sock:
- sock.close()
- return False
-
-IPV6_ENABLED = _is_ipv6_enabled()
-
-def system_must_validate_cert(f):
- """Skip the test on TLS certificate validation failures."""
- @functools.wraps(f)
- def dec(*args, **kwargs):
- try:
- f(*args, **kwargs)
- except IOError as e:
- if "CERTIFICATE_VERIFY_FAILED" in str(e):
- raise unittest.SkipTest("system does not contain "
- "necessary certificates")
- raise
- return dec
-
-FUZZ = 1e-6
-
-def fcmp(x, y): # fuzzy comparison function
- if isinstance(x, float) or isinstance(y, float):
- try:
- fuzz = (abs(x) + abs(y)) * FUZZ
- if abs(x-y) <= fuzz:
- return 0
- except:
- pass
- elif type(x) == type(y) and isinstance(x, (tuple, list)):
- for i in range(min(len(x), len(y))):
- outcome = fcmp(x[i], y[i])
- if outcome != 0:
- return outcome
- return (len(x) > len(y)) - (len(x) < len(y))
- return (x > y) - (x < y)
-
-
-# A constant likely larger than the underlying OS pipe buffer size, to
-# make writes blocking.
-# Windows limit seems to be around 512 B, and many Unix kernels have a
-# 64 KiB pipe buffer size or 16 * PAGE_SIZE: take a few megs to be sure.
-# (see issue #17835 for a discussion of this number).
-PIPE_MAX_SIZE = 4 * 1024 * 1024 + 1
-
-# A constant likely larger than the underlying OS socket buffer size, to make
-# writes blocking.
-# The socket buffer sizes can usually be tuned system-wide (e.g. through sysctl
-# on Linux), or on a per-socket basis (SO_SNDBUF/SO_RCVBUF). See issue #18643
-# for a discussion of this number).
-SOCK_MAX_SIZE = 16 * 1024 * 1024 + 1
-
-is_jython = sys.platform.startswith('java')
-
-try:
- unicode
- have_unicode = True
-except NameError:
- have_unicode = False
-
-requires_unicode = unittest.skipUnless(have_unicode, 'no unicode support')
-
-def u(s):
- return unicode(s, 'unicode-escape')
-
-# FS_NONASCII: non-ASCII Unicode character encodable by
-# sys.getfilesystemencoding(), or None if there is no such character.
-FS_NONASCII = None
-if have_unicode:
- for character in (
- # First try printable and common characters to have a readable filename.
- # For each character, the encoding list are just example of encodings able
- # to encode the character (the list is not exhaustive).
-
- # U+00E6 (Latin Small Letter Ae): cp1252, iso-8859-1
- unichr(0x00E6),
- # U+0130 (Latin Capital Letter I With Dot Above): cp1254, iso8859_3
- unichr(0x0130),
- # U+0141 (Latin Capital Letter L With Stroke): cp1250, cp1257
- unichr(0x0141),
- # U+03C6 (Greek Small Letter Phi): cp1253
- unichr(0x03C6),
- # U+041A (Cyrillic Capital Letter Ka): cp1251
- unichr(0x041A),
- # U+05D0 (Hebrew Letter Alef): Encodable to cp424
- unichr(0x05D0),
- # U+060C (Arabic Comma): cp864, cp1006, iso8859_6, mac_arabic
- unichr(0x060C),
- # U+062A (Arabic Letter Teh): cp720
- unichr(0x062A),
- # U+0E01 (Thai Character Ko Kai): cp874
- unichr(0x0E01),
-
- # Then try more "special" characters. "special" because they may be
- # interpreted or displayed differently depending on the exact locale
- # encoding and the font.
-
- # U+00A0 (No-Break Space)
- unichr(0x00A0),
- # U+20AC (Euro Sign)
- unichr(0x20AC),
- ):
- try:
- character.encode(sys.getfilesystemencoding())\
- .decode(sys.getfilesystemencoding())
- except UnicodeError:
- pass
- else:
- FS_NONASCII = character
- break
-
-# Filename used for testing
-if os.name == 'java':
- # Jython disallows @ in module names
- TESTFN = '$test'
-elif os.name == 'riscos':
- TESTFN = 'testfile'
-else:
- TESTFN = '@test'
- # Unicode name only used if TEST_FN_ENCODING exists for the platform.
- if have_unicode:
- # Assuming sys.getfilesystemencoding()!=sys.getdefaultencoding()
- # TESTFN_UNICODE is a filename that can be encoded using the
- # file system encoding, but *not* with the default (ascii) encoding
- if isinstance('', unicode):
- # python -U
- # XXX perhaps unicode() should accept Unicode strings?
- TESTFN_UNICODE = "@test-\xe0\xf2"
- else:
- # 2 latin characters.
- TESTFN_UNICODE = unicode("@test-\xe0\xf2", "latin-1")
- TESTFN_ENCODING = sys.getfilesystemencoding()
- # TESTFN_UNENCODABLE is a filename that should *not* be
- # able to be encoded by *either* the default or filesystem encoding.
- # This test really only makes sense on Windows NT platforms
- # which have special Unicode support in posixmodule.
- if (not hasattr(sys, "getwindowsversion") or
- sys.getwindowsversion()[3] < 2): # 0=win32s or 1=9x/ME
- TESTFN_UNENCODABLE = None
- else:
- # Japanese characters (I think - from bug 846133)
- TESTFN_UNENCODABLE = eval('u"@test-\u5171\u6709\u3055\u308c\u308b"')
- try:
- # XXX - Note - should be using TESTFN_ENCODING here - but for
- # Windows, "mbcs" currently always operates as if in
- # errors=ignore' mode - hence we get '?' characters rather than
- # the exception. 'Latin1' operates as we expect - ie, fails.
- # See [ 850997 ] mbcs encoding ignores errors
- TESTFN_UNENCODABLE.encode("Latin1")
- except UnicodeEncodeError:
- pass
- else:
- print \
- 'WARNING: The filename %r CAN be encoded by the filesystem. ' \
- 'Unicode filename tests may not be effective' \
- % TESTFN_UNENCODABLE
-
-
-# Disambiguate TESTFN for parallel testing, while letting it remain a valid
-# module name.
-TESTFN = "{}_{}_tmp".format(TESTFN, os.getpid())
-
-# Save the initial cwd
-SAVEDCWD = os.getcwd()
-
-@contextlib.contextmanager
-def change_cwd(path, quiet=False):
- """Return a context manager that changes the current working directory.
-
- Arguments:
-
- path: the directory to use as the temporary current working directory.
-
- quiet: if False (the default), the context manager raises an exception
- on error. Otherwise, it issues only a warning and keeps the current
- working directory the same.
-
- """
- saved_dir = os.getcwd()
- try:
- os.chdir(path)
- except OSError:
- if not quiet:
- raise
- warnings.warn('tests may fail, unable to change CWD to: ' + path,
- RuntimeWarning, stacklevel=3)
- try:
- yield os.getcwd()
- finally:
- os.chdir(saved_dir)
-
-
-@contextlib.contextmanager
-def temp_cwd(name='tempcwd', quiet=False):
- """
- Context manager that creates a temporary directory and set it as CWD.
-
- The new CWD is created in the current directory and it's named *name*.
- If *quiet* is False (default) and it's not possible to create or change
- the CWD, an error is raised. If it's True, only a warning is raised
- and the original CWD is used.
- """
- if (have_unicode and isinstance(name, unicode) and
- not os.path.supports_unicode_filenames):
- try:
- name = name.encode(sys.getfilesystemencoding() or 'ascii')
- except UnicodeEncodeError:
- if not quiet:
- raise unittest.SkipTest('unable to encode the cwd name with '
- 'the filesystem encoding.')
- saved_dir = os.getcwd()
- is_temporary = False
- try:
- os.mkdir(name)
- os.chdir(name)
- is_temporary = True
- except OSError:
- if not quiet:
- raise
- warnings.warn('tests may fail, unable to change the CWD to ' + name,
- RuntimeWarning, stacklevel=3)
- try:
- yield os.getcwd()
- finally:
- os.chdir(saved_dir)
- if is_temporary:
- rmtree(name)
-
-
-def findfile(file, here=__file__, subdir=None):
- """Try to find a file on sys.path and the working directory. If it is not
- found the argument passed to the function is returned (this does not
- necessarily signal failure; could still be the legitimate path)."""
- if os.path.isabs(file):
- return file
- if subdir is not None:
- file = os.path.join(subdir, file)
- path = sys.path
- path = [os.path.dirname(here)] + path
- for dn in path:
- fn = os.path.join(dn, file)
- if os.path.exists(fn): return fn
- return file
-
-def sortdict(dict):
- "Like repr(dict), but in sorted order."
- items = dict.items()
- items.sort()
- reprpairs = ["%r: %r" % pair for pair in items]
- withcommas = ", ".join(reprpairs)
- return "{%s}" % withcommas
-
-def make_bad_fd():
- """
- Create an invalid file descriptor by opening and closing a file and return
- its fd.
- """
- file = open(TESTFN, "wb")
- try:
- return file.fileno()
- finally:
- file.close()
- unlink(TESTFN)
-
-def check_syntax_error(testcase, statement):
- testcase.assertRaises(SyntaxError, compile, statement,
- '<test string>', 'exec')
-
-def open_urlresource(url, check=None):
- import urlparse, urllib2
-
- filename = urlparse.urlparse(url)[2].split('/')[-1] # '/': it's URL!
-
- fn = os.path.join(os.path.dirname(__file__), "data", filename)
-
- def check_valid_file(fn):
- f = open(fn)
- if check is None:
- return f
- elif check(f):
- f.seek(0)
- return f
- f.close()
-
- if os.path.exists(fn):
- f = check_valid_file(fn)
- if f is not None:
- return f
- unlink(fn)
-
- # Verify the requirement before downloading the file
- requires('urlfetch')
-
- print >> get_original_stdout(), '\tfetching %s ...' % url
- f = urllib2.urlopen(url, timeout=15)
- try:
- with open(fn, "wb") as out:
- s = f.read()
- while s:
- out.write(s)
- s = f.read()
- finally:
- f.close()
-
- f = check_valid_file(fn)
- if f is not None:
- return f
- raise TestFailed('invalid resource "%s"' % fn)
-
-
-class WarningsRecorder(object):
- """Convenience wrapper for the warnings list returned on
- entry to the warnings.catch_warnings() context manager.
- """
- def __init__(self, warnings_list):
- self._warnings = warnings_list
- self._last = 0
-
- def __getattr__(self, attr):
- if len(self._warnings) > self._last:
- return getattr(self._warnings[-1], attr)
- elif attr in warnings.WarningMessage._WARNING_DETAILS:
- return None
- raise AttributeError("%r has no attribute %r" % (self, attr))
-
- @property
- def warnings(self):
- return self._warnings[self._last:]
-
- def reset(self):
- self._last = len(self._warnings)
-
-
-def _filterwarnings(filters, quiet=False):
- """Catch the warnings, then check if all the expected
- warnings have been raised and re-raise unexpected warnings.
- If 'quiet' is True, only re-raise the unexpected warnings.
- """
- # Clear the warning registry of the calling module
- # in order to re-raise the warnings.
- frame = sys._getframe(2)
- registry = frame.f_globals.get('__warningregistry__')
- if registry:
- registry.clear()
- with warnings.catch_warnings(record=True) as w:
- # Set filter "always" to record all warnings. Because
- # test_warnings swap the module, we need to look up in
- # the sys.modules dictionary.
- sys.modules['warnings'].simplefilter("always")
- yield WarningsRecorder(w)
- # Filter the recorded warnings
- reraise = [warning.message for warning in w]
- missing = []
- for msg, cat in filters:
- seen = False
- for exc in reraise[:]:
- message = str(exc)
- # Filter out the matching messages
- if (re.match(msg, message, re.I) and
- issubclass(exc.__class__, cat)):
- seen = True
- reraise.remove(exc)
- if not seen and not quiet:
- # This filter caught nothing
- missing.append((msg, cat.__name__))
- if reraise:
- raise AssertionError("unhandled warning %r" % reraise[0])
- if missing:
- raise AssertionError("filter (%r, %s) did not catch any warning" %
- missing[0])
-
-
-@contextlib.contextmanager
-def check_warnings(*filters, **kwargs):
- """Context manager to silence warnings.
-
- Accept 2-tuples as positional arguments:
- ("message regexp", WarningCategory)
-
- Optional argument:
- - if 'quiet' is True, it does not fail if a filter catches nothing
- (default True without argument,
- default False if some filters are defined)
-
- Without argument, it defaults to:
- check_warnings(("", Warning), quiet=True)
- """
- quiet = kwargs.get('quiet')
- if not filters:
- filters = (("", Warning),)
- # Preserve backward compatibility
- if quiet is None:
- quiet = True
- return _filterwarnings(filters, quiet)
-
-
-@contextlib.contextmanager
-def check_py3k_warnings(*filters, **kwargs):
- """Context manager to silence py3k warnings.
-
- Accept 2-tuples as positional arguments:
- ("message regexp", WarningCategory)
-
- Optional argument:
- - if 'quiet' is True, it does not fail if a filter catches nothing
- (default False)
-
- Without argument, it defaults to:
- check_py3k_warnings(("", DeprecationWarning), quiet=False)
- """
- if sys.py3kwarning:
- if not filters:
- filters = (("", DeprecationWarning),)
- else:
- # It should not raise any py3k warning
- filters = ()
- return _filterwarnings(filters, kwargs.get('quiet'))
-
-
-class CleanImport(object):
- """Context manager to force import to return a new module reference.
-
- This is useful for testing module-level behaviours, such as
- the emission of a DeprecationWarning on import.
-
- Use like this:
-
- with CleanImport("foo"):
- importlib.import_module("foo") # new reference
- """
-
- def __init__(self, *module_names):
- self.original_modules = sys.modules.copy()
- for module_name in module_names:
- if module_name in sys.modules:
- module = sys.modules[module_name]
- # It is possible that module_name is just an alias for
- # another module (e.g. stub for modules renamed in 3.x).
- # In that case, we also need delete the real module to clear
- # the import cache.
- if module.__name__ != module_name:
- del sys.modules[module.__name__]
- del sys.modules[module_name]
-
- def __enter__(self):
- return self
-
- def __exit__(self, *ignore_exc):
- sys.modules.update(self.original_modules)
-
-
-class EnvironmentVarGuard(UserDict.DictMixin):
-
- """Class to help protect the environment variable properly. Can be used as
- a context manager."""
-
- def __init__(self):
- self._environ = os.environ
- self._changed = {}
-
- def __getitem__(self, envvar):
- return self._environ[envvar]
-
- def __setitem__(self, envvar, value):
- # Remember the initial value on the first access
- if envvar not in self._changed:
- self._changed[envvar] = self._environ.get(envvar)
- self._environ[envvar] = value
-
- def __delitem__(self, envvar):
- # Remember the initial value on the first access
- if envvar not in self._changed:
- self._changed[envvar] = self._environ.get(envvar)
- if envvar in self._environ:
- del self._environ[envvar]
-
- def keys(self):
- return self._environ.keys()
-
- def set(self, envvar, value):
- self[envvar] = value
-
- def unset(self, envvar):
- del self[envvar]
-
- def __enter__(self):
- return self
-
- def __exit__(self, *ignore_exc):
- for (k, v) in self._changed.items():
- if v is None:
- if k in self._environ:
- del self._environ[k]
- else:
- self._environ[k] = v
- os.environ = self._environ
-
-
-class DirsOnSysPath(object):
- """Context manager to temporarily add directories to sys.path.
-
- This makes a copy of sys.path, appends any directories given
- as positional arguments, then reverts sys.path to the copied
- settings when the context ends.
-
- Note that *all* sys.path modifications in the body of the
- context manager, including replacement of the object,
- will be reverted at the end of the block.
- """
-
- def __init__(self, *paths):
- self.original_value = sys.path[:]
- self.original_object = sys.path
- sys.path.extend(paths)
-
- def __enter__(self):
- return self
-
- def __exit__(self, *ignore_exc):
- sys.path = self.original_object
- sys.path[:] = self.original_value
-
-
-class TransientResource(object):
-
- """Raise ResourceDenied if an exception is raised while the context manager
- is in effect that matches the specified exception and attributes."""
-
- def __init__(self, exc, **kwargs):
- self.exc = exc
- self.attrs = kwargs
-
- def __enter__(self):
- return self
-
- def __exit__(self, type_=None, value=None, traceback=None):
- """If type_ is a subclass of self.exc and value has attributes matching
- self.attrs, raise ResourceDenied. Otherwise let the exception
- propagate (if any)."""
- if type_ is not None and issubclass(self.exc, type_):
- for attr, attr_value in self.attrs.iteritems():
- if not hasattr(value, attr):
- break
- if getattr(value, attr) != attr_value:
- break
- else:
- raise ResourceDenied("an optional resource is not available")
-
-
-@contextlib.contextmanager
-def transient_internet(resource_name, timeout=30.0, errnos=()):
- """Return a context manager that raises ResourceDenied when various issues
- with the Internet connection manifest themselves as exceptions."""
- default_errnos = [
- ('ECONNREFUSED', 111),
- ('ECONNRESET', 104),
- ('EHOSTUNREACH', 113),
- ('ENETUNREACH', 101),
- ('ETIMEDOUT', 110),
- ]
- default_gai_errnos = [
- ('EAI_AGAIN', -3),
- ('EAI_FAIL', -4),
- ('EAI_NONAME', -2),
- ('EAI_NODATA', -5),
- # Windows defines EAI_NODATA as 11001 but idiotic getaddrinfo()
- # implementation actually returns WSANO_DATA i.e. 11004.
- ('WSANO_DATA', 11004),
- ]
-
- denied = ResourceDenied("Resource '%s' is not available" % resource_name)
- captured_errnos = errnos
- gai_errnos = []
- if not captured_errnos:
- captured_errnos = [getattr(errno, name, num)
- for (name, num) in default_errnos]
- gai_errnos = [getattr(socket, name, num)
- for (name, num) in default_gai_errnos]
-
- def filter_error(err):
- n = getattr(err, 'errno', None)
- if (isinstance(err, socket.timeout) or
- (isinstance(err, socket.gaierror) and n in gai_errnos) or
- n in captured_errnos):
- if not verbose:
- sys.stderr.write(denied.args[0] + "\n")
- raise denied
-
- old_timeout = socket.getdefaulttimeout()
- try:
- if timeout is not None:
- socket.setdefaulttimeout(timeout)
- yield
- except IOError as err:
- # urllib can wrap original socket errors multiple times (!), we must
- # unwrap to get at the original error.
- while True:
- a = err.args
- if len(a) >= 1 and isinstance(a[0], IOError):
- err = a[0]
- # The error can also be wrapped as args[1]:
- # except socket.error as msg:
- # raise IOError('socket error', msg).with_traceback(sys.exc_info()[2])
- elif len(a) >= 2 and isinstance(a[1], IOError):
- err = a[1]
- else:
- break
- filter_error(err)
- raise
- # XXX should we catch generic exceptions and look for their
- # __cause__ or __context__?
- finally:
- socket.setdefaulttimeout(old_timeout)
-
-
-@contextlib.contextmanager
-def captured_output(stream_name):
- """Return a context manager used by captured_stdout and captured_stdin
- that temporarily replaces the sys stream *stream_name* with a StringIO."""
- import StringIO
- orig_stdout = getattr(sys, stream_name)
- setattr(sys, stream_name, StringIO.StringIO())
- try:
- yield getattr(sys, stream_name)
- finally:
- setattr(sys, stream_name, orig_stdout)
-
-def captured_stdout():
- """Capture the output of sys.stdout:
-
- with captured_stdout() as s:
- print "hello"
- self.assertEqual(s.getvalue(), "hello")
- """
- return captured_output("stdout")
-
-def captured_stderr():
- return captured_output("stderr")
-
-def captured_stdin():
- return captured_output("stdin")
-
-def gc_collect():
- """Force as many objects as possible to be collected.
-
- In non-CPython implementations of Python, this is needed because timely
- deallocation is not guaranteed by the garbage collector. (Even in CPython
- this can be the case in case of reference cycles.) This means that __del__
- methods may be called later than expected and weakrefs may remain alive for
- longer than expected. This function tries its best to force all garbage
- objects to disappear.
- """
- gc.collect()
- if is_jython:
- time.sleep(0.1)
- gc.collect()
- gc.collect()
-
-
-_header = '2P'
-if hasattr(sys, "gettotalrefcount"):
- _header = '2P' + _header
-_vheader = _header + 'P'
-
-def calcobjsize(fmt):
- return struct.calcsize(_header + fmt + '0P')
-
-def calcvobjsize(fmt):
- return struct.calcsize(_vheader + fmt + '0P')
-
-
-_TPFLAGS_HAVE_GC = 1<<14
-_TPFLAGS_HEAPTYPE = 1<<9
-
-def check_sizeof(test, o, size):
- import _testcapi
- result = sys.getsizeof(o)
- # add GC header size
- if ((type(o) == type) and (o.__flags__ & _TPFLAGS_HEAPTYPE) or\
- ((type(o) != type) and (type(o).__flags__ & _TPFLAGS_HAVE_GC))):
- size += _testcapi.SIZEOF_PYGC_HEAD
- msg = 'wrong size for %s: got %d, expected %d' \
- % (type(o), result, size)
- test.assertEqual(result, size, msg)
-
-
-#=======================================================================
-# Decorator for running a function in a different locale, correctly resetting
-# it afterwards.
-
-def run_with_locale(catstr, *locales):
- def decorator(func):
- def inner(*args, **kwds):
- try:
- import locale
- category = getattr(locale, catstr)
- orig_locale = locale.setlocale(category)
- except AttributeError:
- # if the test author gives us an invalid category string
- raise
- except:
- # cannot retrieve original locale, so do nothing
- locale = orig_locale = None
- else:
- for loc in locales:
- try:
- locale.setlocale(category, loc)
- break
- except:
- pass
-
- # now run the function, resetting the locale on exceptions
- try:
- return func(*args, **kwds)
- finally:
- if locale and orig_locale:
- locale.setlocale(category, orig_locale)
- inner.func_name = func.func_name
- inner.__doc__ = func.__doc__
- return inner
- return decorator
-
-#=======================================================================
-# Decorator for running a function in a specific timezone, correctly
-# resetting it afterwards.
-
-def run_with_tz(tz):
- def decorator(func):
- def inner(*args, **kwds):
- try:
- tzset = time.tzset
- except AttributeError:
- raise unittest.SkipTest("tzset required")
- if 'TZ' in os.environ:
- orig_tz = os.environ['TZ']
- else:
- orig_tz = None
- os.environ['TZ'] = tz
- tzset()
-
- # now run the function, resetting the tz on exceptions
- try:
- return func(*args, **kwds)
- finally:
- if orig_tz is None:
- del os.environ['TZ']
- else:
- os.environ['TZ'] = orig_tz
- time.tzset()
-
- inner.__name__ = func.__name__
- inner.__doc__ = func.__doc__
- return inner
- return decorator
-
-#=======================================================================
-# Big-memory-test support. Separate from 'resources' because memory use should be configurable.
-
-# Some handy shorthands. Note that these are used for byte-limits as well
-# as size-limits, in the various bigmem tests
-_1M = 1024*1024
-_1G = 1024 * _1M
-_2G = 2 * _1G
-_4G = 4 * _1G
-
-MAX_Py_ssize_t = sys.maxsize
-
-def set_memlimit(limit):
- global max_memuse
- global real_max_memuse
- sizes = {
- 'k': 1024,
- 'm': _1M,
- 'g': _1G,
- 't': 1024*_1G,
- }
- m = re.match(r'(\d+(\.\d+)?) (K|M|G|T)b?$', limit,
- re.IGNORECASE | re.VERBOSE)
- if m is None:
- raise ValueError('Invalid memory limit %r' % (limit,))
- memlimit = int(float(m.group(1)) * sizes[m.group(3).lower()])
- real_max_memuse = memlimit
- if memlimit > MAX_Py_ssize_t:
- memlimit = MAX_Py_ssize_t
- if memlimit < _2G - 1:
- raise ValueError('Memory limit %r too low to be useful' % (limit,))
- max_memuse = memlimit
-
-def bigmemtest(minsize, memuse, overhead=5*_1M):
- """Decorator for bigmem tests.
-
- 'minsize' is the minimum useful size for the test (in arbitrary,
- test-interpreted units.) 'memuse' is the number of 'bytes per size' for
- the test, or a good estimate of it. 'overhead' specifies fixed overhead,
- independent of the testsize, and defaults to 5Mb.
-
- The decorator tries to guess a good value for 'size' and passes it to
- the decorated test function. If minsize * memuse is more than the
- allowed memory use (as defined by max_memuse), the test is skipped.
- Otherwise, minsize is adjusted upward to use up to max_memuse.
- """
- def decorator(f):
- def wrapper(self):
- if not max_memuse:
- # If max_memuse is 0 (the default),
- # we still want to run the tests with size set to a few kb,
- # to make sure they work. We still want to avoid using
- # too much memory, though, but we do that noisily.
- maxsize = 5147
- self.assertFalse(maxsize * memuse + overhead > 20 * _1M)
- else:
- maxsize = int((max_memuse - overhead) / memuse)
- if maxsize < minsize:
- # Really ought to print 'test skipped' or something
- if verbose:
- sys.stderr.write("Skipping %s because of memory "
- "constraint\n" % (f.__name__,))
- return
- # Try to keep some breathing room in memory use
- maxsize = max(maxsize - 50 * _1M, minsize)
- return f(self, maxsize)
- wrapper.minsize = minsize
- wrapper.memuse = memuse
- wrapper.overhead = overhead
- return wrapper
- return decorator
-
-def precisionbigmemtest(size, memuse, overhead=5*_1M, dry_run=True):
- def decorator(f):
- def wrapper(self):
- if not real_max_memuse:
- maxsize = 5147
- else:
- maxsize = size
-
- if ((real_max_memuse or not dry_run)
- and real_max_memuse < maxsize * memuse):
- if verbose:
- sys.stderr.write("Skipping %s because of memory "
- "constraint\n" % (f.__name__,))
- return
-
- return f(self, maxsize)
- wrapper.size = size
- wrapper.memuse = memuse
- wrapper.overhead = overhead
- return wrapper
- return decorator
-
-def bigaddrspacetest(f):
- """Decorator for tests that fill the address space."""
- def wrapper(self):
- if max_memuse < MAX_Py_ssize_t:
- if verbose:
- sys.stderr.write("Skipping %s because of memory "
- "constraint\n" % (f.__name__,))
- else:
- return f(self)
- return wrapper
-
-#=======================================================================
-# unittest integration.
-
-class BasicTestRunner:
- def run(self, test):
- result = unittest.TestResult()
- test(result)
- return result
-
-def _id(obj):
- return obj
-
-def requires_resource(resource):
- if resource == 'gui' and not _is_gui_available():
- return unittest.skip(_is_gui_available.reason)
- if is_resource_enabled(resource):
- return _id
- else:
- return unittest.skip("resource {0!r} is not enabled".format(resource))
-
-def cpython_only(test):
- """
- Decorator for tests only applicable on CPython.
- """
- return impl_detail(cpython=True)(test)
-
-def impl_detail(msg=None, **guards):
- if check_impl_detail(**guards):
- return _id
- if msg is None:
- guardnames, default = _parse_guards(guards)
- if default:
- msg = "implementation detail not available on {0}"
- else:
- msg = "implementation detail specific to {0}"
- guardnames = sorted(guardnames.keys())
- msg = msg.format(' or '.join(guardnames))
- return unittest.skip(msg)
-
-def _parse_guards(guards):
- # Returns a tuple ({platform_name: run_me}, default_value)
- if not guards:
- return ({'cpython': True}, False)
- is_true = guards.values()[0]
- assert guards.values() == [is_true] * len(guards) # all True or all False
- return (guards, not is_true)
-
-# Use the following check to guard CPython's implementation-specific tests --
-# or to run them only on the implementation(s) guarded by the arguments.
-def check_impl_detail(**guards):
- """This function returns True or False depending on the host platform.
- Examples:
- if check_impl_detail(): # only on CPython (default)
- if check_impl_detail(jython=True): # only on Jython
- if check_impl_detail(cpython=False): # everywhere except on CPython
- """
- guards, default = _parse_guards(guards)
- return guards.get(platform.python_implementation().lower(), default)
-
-
-
-def _run_suite(suite):
- """Run tests from a unittest.TestSuite-derived class."""
- if verbose:
- runner = unittest.TextTestRunner(sys.stdout, verbosity=2)
- else:
- runner = BasicTestRunner()
-
- result = runner.run(suite)
- if not result.wasSuccessful():
- if len(result.errors) == 1 and not result.failures:
- err = result.errors[0][1]
- elif len(result.failures) == 1 and not result.errors:
- err = result.failures[0][1]
- else:
- err = "multiple errors occurred"
- if not verbose:
- err += "; run in verbose mode for details"
- raise TestFailed(err)
-
-
-def run_unittest(*classes):
- """Run tests from unittest.TestCase-derived classes."""
- valid_types = (unittest.TestSuite, unittest.TestCase)
- suite = unittest.TestSuite()
- for cls in classes:
- if isinstance(cls, str):
- if cls in sys.modules:
- suite.addTest(unittest.findTestCases(sys.modules[cls]))
- else:
- raise ValueError("str arguments must be keys in sys.modules")
- elif isinstance(cls, valid_types):
- suite.addTest(cls)
- else:
- suite.addTest(unittest.makeSuite(cls))
- _run_suite(suite)
-
-#=======================================================================
-# Check for the presence of docstrings.
-
-HAVE_DOCSTRINGS = (check_impl_detail(cpython=False) or
- sys.platform == 'win32' or
- sysconfig.get_config_var('WITH_DOC_STRINGS'))
-
-requires_docstrings = unittest.skipUnless(HAVE_DOCSTRINGS,
- "test requires docstrings")
-
-
-#=======================================================================
-# doctest driver.
-
-def run_doctest(module, verbosity=None):
- """Run doctest on the given module. Return (#failures, #tests).
-
- If optional argument verbosity is not specified (or is None), pass
- test_support's belief about verbosity on to doctest. Else doctest's
- usual behavior is used (it searches sys.argv for -v).
- """
-
- import doctest
-
- if verbosity is None:
- verbosity = verbose
- else:
- verbosity = None
-
- # Direct doctest output (normally just errors) to real stdout; doctest
- # output shouldn't be compared by regrtest.
- save_stdout = sys.stdout
- sys.stdout = get_original_stdout()
- try:
- f, t = doctest.testmod(module, verbose=verbosity)
- if f:
- raise TestFailed("%d of %d doctests failed" % (f, t))
- finally:
- sys.stdout = save_stdout
- if verbose:
- print 'doctest (%s) ... %d tests with zero failures' % (module.__name__, t)
- return f, t
-
-#=======================================================================
-# Threading support to prevent reporting refleaks when running regrtest.py -R
-
-# NOTE: we use thread._count() rather than threading.enumerate() (or the
-# moral equivalent thereof) because a threading.Thread object is still alive
-# until its __bootstrap() method has returned, even after it has been
-# unregistered from the threading module.
-# thread._count(), on the other hand, only gets decremented *after* the
-# __bootstrap() method has returned, which gives us reliable reference counts
-# at the end of a test run.
-
-def threading_setup():
- if thread:
- return thread._count(),
- else:
- return 1,
-
-def threading_cleanup(nb_threads):
- if not thread:
- return
-
- _MAX_COUNT = 10
- for count in range(_MAX_COUNT):
- n = thread._count()
- if n == nb_threads:
- break
- time.sleep(0.1)
- # XXX print a warning in case of failure?
-
-def reap_threads(func):
- """Use this function when threads are being used. This will
- ensure that the threads are cleaned up even when the test fails.
- If threading is unavailable this function does nothing.
- """
- if not thread:
- return func
-
- @functools.wraps(func)
- def decorator(*args):
- key = threading_setup()
- try:
- return func(*args)
- finally:
- threading_cleanup(*key)
- return decorator
-
-def reap_children():
- """Use this function at the end of test_main() whenever sub-processes
- are started. This will help ensure that no extra children (zombies)
- stick around to hog resources and create problems when looking
- for refleaks.
- """
-
- # Reap all our dead child processes so we don't leave zombies around.
- # These hog resources and might be causing some of the buildbots to die.
- if hasattr(os, 'waitpid'):
- any_process = -1
- while True:
- try:
- # This will raise an exception on Windows. That's ok.
- pid, status = os.waitpid(any_process, os.WNOHANG)
- if pid == 0:
- break
- except:
- break
-
-@contextlib.contextmanager
-def start_threads(threads, unlock=None):
- threads = list(threads)
- started = []
- try:
- try:
- for t in threads:
- t.start()
- started.append(t)
- except:
- if verbose:
- print("Can't start %d threads, only %d threads started" %
- (len(threads), len(started)))
- raise
- yield
- finally:
- if unlock:
- unlock()
- endtime = starttime = time.time()
- for timeout in range(1, 16):
- endtime += 60
- for t in started:
- t.join(max(endtime - time.time(), 0.01))
- started = [t for t in started if t.isAlive()]
- if not started:
- break
- if verbose:
- print('Unable to join %d threads during a period of '
- '%d minutes' % (len(started), timeout))
- started = [t for t in started if t.isAlive()]
- if started:
- raise AssertionError('Unable to join %d threads' % len(started))
-
-@contextlib.contextmanager
-def swap_attr(obj, attr, new_val):
- """Temporary swap out an attribute with a new object.
-
- Usage:
- with swap_attr(obj, "attr", 5):
- ...
-
- This will set obj.attr to 5 for the duration of the with: block,
- restoring the old value at the end of the block. If `attr` doesn't
- exist on `obj`, it will be created and then deleted at the end of the
- block.
-
- The old value (or None if it doesn't exist) will be assigned to the
- target of the "as" clause, if there is one.
- """
- if hasattr(obj, attr):
- real_val = getattr(obj, attr)
- setattr(obj, attr, new_val)
- try:
- yield real_val
- finally:
- setattr(obj, attr, real_val)
- else:
- setattr(obj, attr, new_val)
- try:
- yield
- finally:
- if hasattr(obj, attr):
- delattr(obj, attr)
-
-@contextlib.contextmanager
-def swap_item(obj, item, new_val):
- """Temporary swap out an item with a new object.
-
- Usage:
- with swap_item(obj, "item", 5):
- ...
-
- This will set obj["item"] to 5 for the duration of the with: block,
- restoring the old value at the end of the block. If `item` doesn't
- exist on `obj`, it will be created and then deleted at the end of the
- block.
-
- The old value (or None if it doesn't exist) will be assigned to the
- target of the "as" clause, if there is one.
- """
- if item in obj:
- real_val = obj[item]
- obj[item] = new_val
- try:
- yield real_val
- finally:
- obj[item] = real_val
- else:
- obj[item] = new_val
- try:
- yield
- finally:
- if item in obj:
- del obj[item]
-
-def py3k_bytes(b):
- """Emulate the py3k bytes() constructor.
-
- NOTE: This is only a best effort function.
- """
- try:
- # memoryview?
- return b.tobytes()
- except AttributeError:
- try:
- # iterable of ints?
- return b"".join(chr(x) for x in b)
- except TypeError:
- return bytes(b)
-
-def args_from_interpreter_flags():
- """Return a list of command-line arguments reproducing the current
- settings in sys.flags."""
- import subprocess
- return subprocess._args_from_interpreter_flags()
-
-def strip_python_stderr(stderr):
- """Strip the stderr of a Python process from potential debug output
- emitted by the interpreter.
-
- This will typically be run on the result of the communicate() method
- of a subprocess.Popen object.
- """
- stderr = re.sub(br"\[\d+ refs\]\r?\n?$", b"", stderr).strip()
- return stderr
-
-
-def check_free_after_iterating(test, iter, cls, args=()):
- class A(cls):
- def __del__(self):
- done[0] = True
- try:
- next(it)
- except StopIteration:
- pass
-
- done = [False]
- it = iter(A(*args))
- # Issue 26494: Shouldn't crash
- test.assertRaises(StopIteration, next, it)
- # The sequence should be deallocated just after the end of iterating
- gc_collect()
- test.assertTrue(done[0])
-
-@contextlib.contextmanager
-def disable_gc():
- have_gc = gc.isenabled()
- gc.disable()
- try:
- yield
- finally:
- if have_gc:
- gc.enable()
+import test.support
+sys.modules['test.test_support'] = test.support
Tests
-----
+- bpo-30207: To simplify backports from Python 3, the test.test_support
+ module was converted into a package and renamed to test.support. The
+ test.script_helper module was moved into the test.support package.
+ Names test.test_support and test.script_helper are left as aliases to
+ test.support and test.support.script_helper.
+
- bpo-30197: Enhanced function swap_attr() in the test.test_support module.
It now works when delete replaced attribute inside the with statement. The
old value of the attribute (or None if it doesn't exist) now will be