import time
import struct
import sysconfig
+import types
try:
import thread
except ImportError:
try:
attribute = getattr(obj, name)
except AttributeError:
- raise unittest.SkipTest("module %s has no attribute %s" % (
- obj.__name__, name))
+ if isinstance(obj, types.ModuleType):
+ msg = "module %r has no attribute %r" % (obj.__name__, name)
+ elif isinstance(obj, types.ClassType):
+ msg = "class %s has no attribute %r" % (obj.__name__, name)
+ elif isinstance(obj, types.InstanceType):
+ msg = "%s instance has no attribute %r" % (obj.__class__.__name__, name)
+ elif isinstance(obj, type):
+ msg = "type object %r has no attribute %r" % (obj.__name__, name)
+ else:
+ msg = "%r object has no attribute %r" % (type(obj).__name__, name)
+ raise unittest.SkipTest(msg)
else:
return attribute
# Save the initial cwd
SAVEDCWD = os.getcwd()
+@contextlib.contextmanager
+def temp_dir(path=None, quiet=False):
+ """Return a context manager that creates a temporary directory.
+
+ Arguments:
+
+ path: the directory to create temporarily. If omitted or None,
+ defaults to creating a temporary directory using tempfile.mkdtemp.
+
+ quiet: if False (the default), the context manager raises an exception
+ on error. Otherwise, if the path is specified and cannot be
+ created, only a warning is issued.
+
+ """
+ dir_created = False
+ if path is None:
+ import tempfile
+ path = tempfile.mkdtemp()
+ dir_created = True
+ path = os.path.realpath(path)
+ else:
+ if (have_unicode and isinstance(path, unicode) and
+ not os.path.supports_unicode_filenames):
+ try:
+ path = path.encode(sys.getfilesystemencoding() or 'ascii')
+ except UnicodeEncodeError:
+ if not quiet:
+ raise unittest.SkipTest('unable to encode the cwd name with '
+ 'the filesystem encoding.')
+ try:
+ os.mkdir(path)
+ dir_created = True
+ except OSError:
+ if not quiet:
+ raise
+ warnings.warn('tests may fail, unable to create temp dir: ' + path,
+ RuntimeWarning, stacklevel=3)
+ try:
+ yield path
+ finally:
+ if dir_created:
+ rmtree(path)
+
@contextlib.contextmanager
def change_cwd(path, quiet=False):
"""Return a context manager that changes the current working directory.
@contextlib.contextmanager
def temp_cwd(name='tempcwd', quiet=False):
"""
- Context manager that creates a temporary directory and set it as CWD.
+ Context manager that temporarily creates and changes the CWD.
+
+ The function temporarily changes the current working directory
+ after creating a temporary directory in the current directory with
+ name *name*. If *name* is None, the temporary directory is
+ created using tempfile.mkdtemp.
+
+ If *quiet* is False (default) and it is not possible to
+ create or change the CWD, an error is raised. If *quiet* is True,
+ only a warning is raised and the original CWD is used.
- The new CWD is created in the current directory and it's named *name*.
- If *quiet* is False (default) and it's not possible to create or change
- the CWD, an error is raised. If it's True, only a warning is raised
- and the original CWD is used.
"""
- if (have_unicode and isinstance(name, unicode) and
- not os.path.supports_unicode_filenames):
- try:
- name = name.encode(sys.getfilesystemencoding() or 'ascii')
- except UnicodeEncodeError:
- if not quiet:
- raise unittest.SkipTest('unable to encode the cwd name with '
- 'the filesystem encoding.')
- saved_dir = os.getcwd()
- is_temporary = False
- try:
- os.mkdir(name)
- os.chdir(name)
- is_temporary = True
- except OSError:
- if not quiet:
- raise
- warnings.warn('tests may fail, unable to change the CWD to ' + name,
- RuntimeWarning, stacklevel=3)
- try:
- yield os.getcwd()
- finally:
- os.chdir(saved_dir)
- if is_temporary:
- rmtree(name)
+ with temp_dir(path=name, quiet=quiet) as temp_path:
+ with change_cwd(temp_path, quiet=quiet) as cwd_dir:
+ yield cwd_dir
# TEST_HOME_DIR refers to the top level directory of the "test" package
# that contains Python's regression test suite
file.close()
unlink(TESTFN)
-def check_syntax_error(testcase, statement):
- testcase.assertRaises(SyntaxError, compile, statement,
- '<test string>', 'exec')
+def check_syntax_error(testcase, statement, lineno=None, offset=None):
+ with testcase.assertRaises(SyntaxError) as cm:
+ compile(statement, '<test string>', 'exec')
+ err = cm.exception
+ if lineno is not None:
+ testcase.assertEqual(err.lineno, lineno)
+ if offset is not None:
+ testcase.assertEqual(err.offset, offset)
def open_urlresource(url, check=None):
import urlparse, urllib2
finally:
if have_gc:
gc.enable()
+
+
+def python_is_optimized():
+ """Find if Python was built with optimizations."""
+ cflags = sysconfig.get_config_var('PY_CFLAGS') or ''
+ final_opt = ""
+ for opt in cflags.split():
+ if opt.startswith('-O'):
+ final_opt = opt
+ return final_opt not in ('', '-O0', '-Og')
--- /dev/null
+import importlib
+import shutil
+import stat
+import sys
+import os
+import unittest
+import socket
+import tempfile
+import errno
+from test import support
+
+TESTFN = support.TESTFN
+
+
+class ClassicClass:
+ pass
+
+class NewStyleClass(object):
+ pass
+
+
+class TestSupport(unittest.TestCase):
+
+ def test_import_module(self):
+ support.import_module("ftplib")
+ self.assertRaises(unittest.SkipTest, support.import_module, "foo")
+
+ def test_import_fresh_module(self):
+ support.import_fresh_module("ftplib")
+
+ def test_get_attribute(self):
+ self.assertEqual(support.get_attribute(self, "test_get_attribute"),
+ self.test_get_attribute)
+ self.assertRaises(unittest.SkipTest, support.get_attribute, self, "foo")
+ with self.assertRaisesRegexp(unittest.SkipTest, 'unittest'):
+ support.get_attribute(unittest, 'foo')
+ with self.assertRaisesRegexp(unittest.SkipTest, 'ClassicClass'):
+ support.get_attribute(ClassicClass, 'foo')
+ with self.assertRaisesRegexp(unittest.SkipTest, 'ClassicClass'):
+ support.get_attribute(ClassicClass(), 'foo')
+ with self.assertRaisesRegexp(unittest.SkipTest, 'NewStyleClass'):
+ support.get_attribute(NewStyleClass, 'foo')
+ with self.assertRaisesRegexp(unittest.SkipTest, 'NewStyleClass'):
+ support.get_attribute(NewStyleClass(), 'foo')
+
+ @unittest.skip("failing buildbots")
+ def test_get_original_stdout(self):
+ self.assertEqual(support.get_original_stdout(), sys.stdout)
+
+ def test_unload(self):
+ import sched
+ self.assertIn("sched", sys.modules)
+ support.unload("sched")
+ self.assertNotIn("sched", sys.modules)
+
+ def test_unlink(self):
+ with open(TESTFN, "w") as f:
+ pass
+ support.unlink(TESTFN)
+ self.assertFalse(os.path.exists(TESTFN))
+ support.unlink(TESTFN)
+
+ def test_rmtree(self):
+ dirpath = support.TESTFN + 'd'
+ subdirpath = os.path.join(dirpath, 'subdir')
+ os.mkdir(dirpath)
+ os.mkdir(subdirpath)
+ support.rmtree(dirpath)
+ self.assertFalse(os.path.exists(dirpath))
+ with support.swap_attr(support, 'verbose', 0):
+ support.rmtree(dirpath)
+
+ os.mkdir(dirpath)
+ os.mkdir(subdirpath)
+ os.chmod(dirpath, stat.S_IRUSR|stat.S_IXUSR)
+ with support.swap_attr(support, 'verbose', 0):
+ support.rmtree(dirpath)
+ self.assertFalse(os.path.exists(dirpath))
+
+ os.mkdir(dirpath)
+ os.mkdir(subdirpath)
+ os.chmod(dirpath, 0)
+ with support.swap_attr(support, 'verbose', 0):
+ support.rmtree(dirpath)
+ self.assertFalse(os.path.exists(dirpath))
+
+ def test_forget(self):
+ mod_filename = TESTFN + '.py'
+ with open(mod_filename, 'wt') as f:
+ f.write('foo = 1\n')
+ sys.path.insert(0, os.curdir)
+ try:
+ mod = __import__(TESTFN)
+ self.assertIn(TESTFN, sys.modules)
+
+ support.forget(TESTFN)
+ self.assertNotIn(TESTFN, sys.modules)
+ finally:
+ del sys.path[0]
+ support.unlink(mod_filename)
+ support.rmtree('__pycache__')
+
+ def test_HOST(self):
+ s = socket.socket()
+ s.bind((support.HOST, 0))
+ s.close()
+
+ def test_find_unused_port(self):
+ port = support.find_unused_port()
+ s = socket.socket()
+ s.bind((support.HOST, port))
+ s.close()
+
+ def test_bind_port(self):
+ s = socket.socket()
+ support.bind_port(s)
+ s.listen(5)
+ s.close()
+
+ # Tests for temp_dir()
+
+ def test_temp_dir(self):
+ """Test that temp_dir() creates and destroys its directory."""
+ parent_dir = tempfile.mkdtemp()
+ parent_dir = os.path.realpath(parent_dir)
+
+ try:
+ path = os.path.join(parent_dir, 'temp')
+ self.assertFalse(os.path.isdir(path))
+ with support.temp_dir(path) as temp_path:
+ self.assertEqual(temp_path, path)
+ self.assertTrue(os.path.isdir(path))
+ self.assertFalse(os.path.isdir(path))
+ finally:
+ support.rmtree(parent_dir)
+
+ def test_temp_dir__path_none(self):
+ """Test passing no path."""
+ with support.temp_dir() as temp_path:
+ self.assertTrue(os.path.isdir(temp_path))
+ self.assertFalse(os.path.isdir(temp_path))
+
+ def test_temp_dir__existing_dir__quiet_default(self):
+ """Test passing a directory that already exists."""
+ def call_temp_dir(path):
+ with support.temp_dir(path) as temp_path:
+ raise Exception("should not get here")
+
+ path = tempfile.mkdtemp()
+ path = os.path.realpath(path)
+ try:
+ self.assertTrue(os.path.isdir(path))
+ with self.assertRaises(OSError) as cm:
+ call_temp_dir(path)
+ self.assertEqual(cm.exception.errno, errno.EEXIST)
+ # Make sure temp_dir did not delete the original directory.
+ self.assertTrue(os.path.isdir(path))
+ finally:
+ shutil.rmtree(path)
+
+ def test_temp_dir__existing_dir__quiet_true(self):
+ """Test passing a directory that already exists with quiet=True."""
+ path = tempfile.mkdtemp()
+ path = os.path.realpath(path)
+
+ try:
+ with support.check_warnings() as recorder:
+ with support.temp_dir(path, quiet=True) as temp_path:
+ self.assertEqual(path, temp_path)
+ warnings = [str(w.message) for w in recorder.warnings]
+ # Make sure temp_dir did not delete the original directory.
+ self.assertTrue(os.path.isdir(path))
+ finally:
+ shutil.rmtree(path)
+
+ expected = ['tests may fail, unable to create temp dir: ' + path]
+ self.assertEqual(warnings, expected)
+
+ # Tests for change_cwd()
+
+ def test_change_cwd(self):
+ original_cwd = os.getcwd()
+
+ with support.temp_dir() as temp_path:
+ with support.change_cwd(temp_path) as new_cwd:
+ self.assertEqual(new_cwd, temp_path)
+ self.assertEqual(os.getcwd(), new_cwd)
+
+ self.assertEqual(os.getcwd(), original_cwd)
+
+ def test_change_cwd__non_existent_dir(self):
+ """Test passing a non-existent directory."""
+ original_cwd = os.getcwd()
+
+ def call_change_cwd(path):
+ with support.change_cwd(path) as new_cwd:
+ raise Exception("should not get here")
+
+ with support.temp_dir() as parent_dir:
+ non_existent_dir = os.path.join(parent_dir, 'does_not_exist')
+ with self.assertRaises(OSError) as cm:
+ call_change_cwd(non_existent_dir)
+ self.assertEqual(cm.exception.errno, errno.ENOENT)
+
+ self.assertEqual(os.getcwd(), original_cwd)
+
+ def test_change_cwd__non_existent_dir__quiet_true(self):
+ """Test passing a non-existent directory with quiet=True."""
+ original_cwd = os.getcwd()
+
+ with support.temp_dir() as parent_dir:
+ bad_dir = os.path.join(parent_dir, 'does_not_exist')
+ with support.check_warnings() as recorder:
+ with support.change_cwd(bad_dir, quiet=True) as new_cwd:
+ self.assertEqual(new_cwd, original_cwd)
+ self.assertEqual(os.getcwd(), new_cwd)
+ warnings = [str(w.message) for w in recorder.warnings]
+
+ expected = ['tests may fail, unable to change CWD to: ' + bad_dir]
+ self.assertEqual(warnings, expected)
+
+ # Tests for change_cwd()
+
+ def test_change_cwd__chdir_warning(self):
+ """Check the warning message when os.chdir() fails."""
+ path = TESTFN + '_does_not_exist'
+ with support.check_warnings() as recorder:
+ with support.change_cwd(path=path, quiet=True):
+ pass
+ messages = [str(w.message) for w in recorder.warnings]
+ self.assertEqual(messages, ['tests may fail, unable to change CWD to: ' + path])
+
+ # Tests for temp_cwd()
+
+ def test_temp_cwd(self):
+ here = os.getcwd()
+ with support.temp_cwd(name=TESTFN):
+ self.assertEqual(os.path.basename(os.getcwd()), TESTFN)
+ self.assertFalse(os.path.exists(TESTFN))
+ self.assertTrue(os.path.basename(os.getcwd()), here)
+
+
+ def test_temp_cwd__name_none(self):
+ """Test passing None to temp_cwd()."""
+ original_cwd = os.getcwd()
+ with support.temp_cwd(name=None) as new_cwd:
+ self.assertNotEqual(new_cwd, original_cwd)
+ self.assertTrue(os.path.isdir(new_cwd))
+ self.assertEqual(os.getcwd(), new_cwd)
+ self.assertEqual(os.getcwd(), original_cwd)
+
+ def test_sortdict(self):
+ self.assertEqual(support.sortdict({3:3, 2:2, 1:1}), "{1: 1, 2: 2, 3: 3}")
+
+ def test_make_bad_fd(self):
+ fd = support.make_bad_fd()
+ with self.assertRaises(OSError) as cm:
+ os.write(fd, b"foo")
+ self.assertEqual(cm.exception.errno, errno.EBADF)
+
+ def test_check_syntax_error(self):
+ support.check_syntax_error(self, "def class", lineno=1, offset=9)
+ with self.assertRaises(AssertionError):
+ support.check_syntax_error(self, "x=1")
+
+ def test_CleanImport(self):
+ import importlib
+ with support.CleanImport("asyncore"):
+ importlib.import_module("asyncore")
+
+ def test_DirsOnSysPath(self):
+ with support.DirsOnSysPath('foo', 'bar'):
+ self.assertIn("foo", sys.path)
+ self.assertIn("bar", sys.path)
+ self.assertNotIn("foo", sys.path)
+ self.assertNotIn("bar", sys.path)
+
+ def test_captured_stdout(self):
+ with support.captured_stdout() as stdout:
+ print "hello"
+ self.assertEqual(stdout.getvalue(), "hello\n")
+
+ def test_captured_stderr(self):
+ with support.captured_stderr() as stderr:
+ print >>sys.stderr, "hello"
+ self.assertEqual(stderr.getvalue(), "hello\n")
+
+ def test_captured_stdin(self):
+ with support.captured_stdin() as stdin:
+ stdin.write('hello\n')
+ stdin.seek(0)
+ # call test code that consumes from sys.stdin
+ captured = raw_input()
+ self.assertEqual(captured, "hello")
+
+ def test_gc_collect(self):
+ support.gc_collect()
+
+ def test_python_is_optimized(self):
+ self.assertIsInstance(support.python_is_optimized(), bool)
+
+ def test_swap_attr(self):
+ class Obj:
+ pass
+ obj = Obj()
+ obj.x = 1
+ with support.swap_attr(obj, "x", 5) as x:
+ self.assertEqual(obj.x, 5)
+ self.assertEqual(x, 1)
+ self.assertEqual(obj.x, 1)
+ with support.swap_attr(obj, "y", 5) as y:
+ self.assertEqual(obj.y, 5)
+ self.assertIsNone(y)
+ self.assertFalse(hasattr(obj, 'y'))
+ with support.swap_attr(obj, "y", 5):
+ del obj.y
+ self.assertFalse(hasattr(obj, 'y'))
+
+ def test_swap_item(self):
+ D = {"x":1}
+ with support.swap_item(D, "x", 5) as x:
+ self.assertEqual(D["x"], 5)
+ self.assertEqual(x, 1)
+ self.assertEqual(D["x"], 1)
+ with support.swap_item(D, "y", 5) as y:
+ self.assertEqual(D["y"], 5)
+ self.assertIsNone(y)
+ self.assertNotIn("y", D)
+ with support.swap_item(D, "y", 5):
+ del D["y"]
+ self.assertNotIn("y", D)
+
+ # XXX -follows a list of untested API
+ # make_legacy_pyc
+ # is_resource_enabled
+ # requires
+ # fcmp
+ # umaks
+ # findfile
+ # check_warnings
+ # EnvironmentVarGuard
+ # TransientResource
+ # transient_internet
+ # run_with_locale
+ # set_memlimit
+ # bigmemtest
+ # precisionbigmemtest
+ # bigaddrspacetest
+ # requires_resource
+ # run_doctest
+ # threading_cleanup
+ # reap_threads
+ # reap_children
+ # strip_python_stderr
+ # args_from_interpreter_flags
+ # can_symlink
+ # skip_unless_symlink
+ # SuppressCrashReport
+
+
+def test_main():
+ tests = [TestSupport]
+ support.run_unittest(*tests)
+
+if __name__ == '__main__':
+ test_main()