Patch by Serhiy Storchaka.
.. versionadded:: 3.2
- # in the list throw an exception
+.. class:: ExitStack()
+
+ A context manager that is designed to make it easy to programmatically
+ combine other context managers and cleanup functions, especially those
+ that are optional or otherwise driven by input data.
+
+ For example, a set of files may easily be handled in a single with
+ statement as follows::
+
+ with ExitStack() as stack:
+ files = [stack.enter_context(open(fname)) for fname in filenames]
+ # All opened files will automatically be closed at the end of
+ # the with statement, even if attempts to open files later
++ # in the list raise an exception
+
+ Each instance maintains a stack of registered callbacks that are called in
+ reverse order when the instance is closed (either explicitly or implicitly
+ at the end of a :keyword:`with` statement). Note that callbacks are *not*
+ invoked implicitly when the context stack instance is garbage collected.
+
+ This stack model is used so that context managers that acquire their
+ resources in their ``__init__`` method (such as file objects) can be
+ handled correctly.
+
+ Since registered callbacks are invoked in the reverse order of
+ registration, this ends up behaving as if multiple nested :keyword:`with`
+ statements had been used with the registered set of callbacks. This even
+ extends to exception handling - if an inner callback suppresses or replaces
+ an exception, then outer callbacks will be passed arguments based on that
+ updated state.
+
+ This is a relatively low level API that takes care of the details of
+ correctly unwinding the stack of exit callbacks. It provides a suitable
+ foundation for higher level context managers that manipulate the exit
+ stack in application specific ways.
+
+ .. versionadded:: 3.3
+
+ .. method:: enter_context(cm)
+
+ Enters a new context manager and adds its :meth:`__exit__` method to
+ the callback stack. The return value is the result of the context
+ manager's own :meth:`__enter__` method.
+
+ These context managers may suppress exceptions just as they normally
+ would if used directly as part of a :keyword:`with` statement.
+
+ .. method:: push(exit)
+
+ Adds a context manager's :meth:`__exit__` method to the callback stack.
+
+ As ``__enter__`` is *not* invoked, this method can be used to cover
+ part of an :meth:`__enter__` implementation with a context manager's own
+ :meth:`__exit__` method.
+
+ If passed an object that is not a context manager, this method assumes
+ it is a callback with the same signature as a context manager's
+ :meth:`__exit__` method and adds it directly to the callback stack.
+
+ By returning true values, these callbacks can suppress exceptions the
+ same way context manager :meth:`__exit__` methods can.
+
+ The passed in object is returned from the function, allowing this
+ method to be used as a function decorator.
+
+ .. method:: callback(callback, *args, **kwds)
+
+ Accepts an arbitrary callback function and arguments and adds it to
+ the callback stack.
+
+ Unlike the other methods, callbacks added this way cannot suppress
+ exceptions (as they are never passed the exception details).
+
+ The passed in callback is returned from the function, allowing this
+ method to be used as a function decorator.
+
+ .. method:: pop_all()
+
+ Transfers the callback stack to a fresh :class:`ExitStack` instance
+ and returns it. No callbacks are invoked by this operation - instead,
+ they will now be invoked when the new stack is closed (either
+ explicitly or implicitly at the end of a :keyword:`with` statement).
+
+ For example, a group of files can be opened as an "all or nothing"
+ operation as follows::
+
+ with ExitStack() as stack:
+ files = [stack.enter_context(open(fname)) for fname in filenames]
+ close_files = stack.pop_all().close
+ # If opening any file fails, all previously opened files will be
+ # closed automatically. If all files are opened successfully,
+ # they will remain open even after the with statement ends.
+ # close_files() can then be invoked explicitly to close them all
+
+ .. method:: close()
+
+ Immediately unwinds the callback stack, invoking callbacks in the
+ reverse order of registration. For any context managers and exit
+ callbacks registered, the arguments passed in will indicate that no
+ exception occurred.
+
+
+Examples and Recipes
+--------------------
+
+This section describes some examples and recipes for making effective use of
+the tools provided by :mod:`contextlib`.
+
+
+Supporting a variable number of context managers
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+The primary use case for :class:`ExitStack` is the one given in the class
+documentation: supporting a variable number of context managers and other
+cleanup operations in a single :keyword:`with` statement. The variability
+may come from the number of context managers needed being driven by user
+input (such as opening a user specified collection of files), or from
+some of the context managers being optional::
+
+ with ExitStack() as stack:
+ for resource in resources:
+ stack.enter_context(resource)
+ if need_special resource:
+ special = acquire_special_resource()
+ stack.callback(release_special_resource, special)
+ # Perform operations that use the acquired resources
+
+As shown, :class:`ExitStack` also makes it quite easy to use :keyword:`with`
+statements to manage arbitrary resources that don't natively support the
+context management protocol.
+
+
+Simplifying support for single optional context managers
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+In the specific case of a single optional context manager, :class:`ExitStack`
+instances can be used as a "do nothing" context manager, allowing a context
+manager to easily be omitted without affecting the overall structure of
+the source code::
+
+ def debug_trace(details):
+ if __debug__:
+ return TraceContext(details)
+ # Don't do anything special with the context in release mode
+ return ExitStack()
+
+ with debug_trace():
+ # Suite is traced in debug mode, but runs normally otherwise
+
+
+Catching exceptions from ``__enter__`` methods
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+It is occasionally desirable to catch exceptions from an ``__enter__``
+method implementation, *without* inadvertently catching exceptions from
+the :keyword:`with` statement body or the context manager's ``__exit__``
+method. By using :class:`ExitStack` the steps in the context management
+protocol can be separated slightly in order to allow this::
+
+ stack = ExitStack()
+ try:
+ x = stack.enter_context(cm)
+ except Exception:
+ # handle __enter__ exception
+ else:
+ with stack:
+ # Handle normal case
+
+Actually needing to do this is likely to indicate that the underlying API
+should be providing a direct resource management interface for use with
+:keyword:`try`/:keyword:`except`/:keyword:`finally` statements, but not
+all APIs are well designed in that regard. When a context manager is the
+only resource management API provided, then :class:`ExitStack` can make it
+easier to handle various situations that can't be handled directly in a
+:keyword:`with` statement.
+
+
+Cleaning up in an ``__enter__`` implementation
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+As noted in the documentation of :meth:`ExitStack.push`, this
+method can be useful in cleaning up an already allocated resource if later
+steps in the :meth:`__enter__` implementation fail.
+
+Here's an example of doing this for a context manager that accepts resource
+acquisition and release functions, along with an optional validation function,
+and maps them to the context management protocol::
+
+ from contextlib import contextmanager, ExitStack
+
+ class ResourceManager(object):
+
+ def __init__(self, acquire_resource, release_resource, check_resource_ok=None):
+ self.acquire_resource = acquire_resource
+ self.release_resource = release_resource
+ if check_resource_ok is None:
+ def check_resource_ok(resource):
+ return True
+ self.check_resource_ok = check_resource_ok
+
+ @contextmanager
+ def _cleanup_on_error(self):
+ with ExitStack() as stack:
+ stack.push(self)
+ yield
+ # The validation check passed and didn't raise an exception
+ # Accordingly, we want to keep the resource, and pass it
+ # back to our caller
+ stack.pop_all()
+
+ def __enter__(self):
+ resource = self.acquire_resource()
+ with self._cleanup_on_error():
+ if not self.check_resource_ok(resource):
+ msg = "Failed validation for {!r}"
+ raise RuntimeError(msg.format(resource))
+ return resource
+
+ def __exit__(self, *exc_details):
+ # We don't need to duplicate any of our resource release logic
+ self.release_resource()
+
+
+Replacing any use of ``try-finally`` and flag variables
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+A pattern you will sometimes see is a ``try-finally`` statement with a flag
+variable to indicate whether or not the body of the ``finally`` clause should
+be executed. In its simplest form (that can't already be handled just by
+using an ``except`` clause instead), it looks something like this::
+
+ cleanup_needed = True
+ try:
+ result = perform_operation()
+ if result:
+ cleanup_needed = False
+ finally:
+ if cleanup_needed:
+ cleanup_resources()
+
+As with any ``try`` statement based code, this can cause problems for
+development and review, because the setup code and the cleanup code can end
+up being separated by arbitrarily long sections of code.
+
+:class:`ExitStack` makes it possible to instead register a callback for
+execution at the end of a ``with`` statement, and then later decide to skip
+executing that callback::
+
+ from contextlib import ExitStack
+
+ with ExitStack() as stack:
+ stack.callback(cleanup_resources)
+ result = perform_operation()
+ if result:
+ stack.pop_all()
+
+This allows the intended cleanup up behaviour to be made explicit up front,
+rather than requiring a separate flag variable.
+
+If a particular application uses this pattern a lot, it can be simplified
+even further by means of a small helper class::
+
+ from contextlib import ExitStack
+
+ class Callback(ExitStack):
+ def __init__(self, callback, *args, **kwds):
+ super(Callback, self).__init__()
+ self.callback(callback, *args, **kwds)
+
+ def cancel(self):
+ self.pop_all()
+
+ with Callback(cleanup_resources) as cb:
+ result = perform_operation()
+ if result:
+ cb.cancel()
+
+If the resource cleanup isn't already neatly bundled into a standalone
+function, then it is still possible to use the decorator form of
+:meth:`ExitStack.callback` to declare the resource cleanup in
+advance::
+
+ from contextlib import ExitStack
+
+ with ExitStack() as stack:
+ @stack.callback
+ def cleanup_resources():
+ ...
+ result = perform_operation()
+ if result:
+ stack.pop_all()
+
+Due to the way the decorator protocol works, a callback function
+declared this way cannot take any parameters. Instead, any resources to
+be released must be accessed as closure variables
+
+
+Using a context manager as a function decorator
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+:class:`ContextDecorator` makes it possible to use a context manager in
+both an ordinary ``with`` statement and also as a function decorator.
+
+For example, it is sometimes useful to wrap functions or groups of statements
+with a logger that can track the time of entry and time of exit. Rather than
+writing both a function decorator and a context manager for the task,
+inheriting from :class:`ContextDecorator` provides both capabilities in a
+single definition::
+
+ from contextlib import ContextDecorator
+ import logging
+
+ logging.basicConfig(level=logging.INFO)
+
+ class track_entry_and_exit(ContextDecorator):
+ def __init__(self, name):
+ self.name = name
+
+ def __enter__(self):
+ logging.info('Entering: {}'.format(name))
+
+ def __exit__(self, exc_type, exc, exc_tb):
+ logging.info('Exiting: {}'.format(name))
+
+Instances of this class can be used as both a context manager::
+
+ with track_entry_and_exit('widget loader'):
+ print('Some time consuming activity goes here')
+ load_widget()
+
+And also as a function decorator::
+
+ @track_entry_and_exit('widget loader')
+ def activity():
+ print('Some time consuming activity goes here')
+ load_widget()
+
+Note that there is one additional limitation when using context managers
+as function decorators: there's no way to access the return value of
+:meth:`__enter__`. If that value is needed, then it is still necessary to use
+an explicit ``with`` statement.
+
.. seealso::
:pep:`0343` - The "with" statement
with SSL support). If *host* is not specified, ``''`` (the local host) is used.
If *port* is omitted, the standard IMAP4-over-SSL port (993) is used. *keyfile*
and *certfile* are also optional - they can contain a PEM formatted private key
- and certificate chain file for the SSL connection.
+ and certificate chain file for the SSL connection. *ssl_context* parameter is a
+ :class:`ssl.SSLContext` object which allows bundling SSL configuration
+ options, certificates and private keys into a single (potentially long-lived)
+ structure. Note that the *keyfile*/*certfile* parameters are mutually exclusive with *ssl_context*,
- a :class:`ValueError` is thrown if *keyfile*/*certfile* is provided along with *ssl_context*.
++ a :class:`ValueError` is raised if *keyfile*/*certfile* is provided along with *ssl_context*.
+
+ .. versionchanged:: 3.3
+ *ssl_context* parameter added.
The second subclass allows for connections created by a child process:
the C library.
- is thrown.
+.. data:: RTLD_LAZY
+ RTLD_NOW
+ RTLD_GLOBAL
+ RTLD_LOCAL
+ RTLD_NODELETE
+ RTLD_NOLOAD
+ RTLD_DEEPBIND
+
+ See the Unix manual page :manpage:`dlopen(3)`.
+
+ .. versionadded:: 3.3
+
+
+.. _terminal-size:
+
+Querying the size of a terminal
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+.. versionadded:: 3.3
+
+.. function:: get_terminal_size(fd=STDOUT_FILENO)
+
+ Return the size of the terminal window as ``(columns, lines)``,
+ tuple of type :class:`terminal_size`.
+
+ The optional argument ``fd`` (default ``STDOUT_FILENO``, or standard
+ output) specifies which file descriptor should be queried.
+
+ If the file descriptor is not connected to a terminal, an :exc:`OSError`
++ is raised.
+
+ :func:`shutil.get_terminal_size` is the high-level function which
+ should normally be used, ``os.get_terminal_size`` is the low-level
+ implementation.
+
+ Availability: Unix, Windows.
+
+.. class:: terminal_size
+
+ A subclass of tuple, holding ``(columns, lines)`` of the terminal window size.
+
+ .. attribute:: columns
+
+ Width of the terminal window in characters.
+
+ .. attribute:: lines
+
+ Height of the terminal window in characters.
+
+
.. _os-file-dir:
Files and Directories
Availability: Unix.
+ .. versionadded:: 3.3
+ Added support for specifying an open file descriptor for *path*.
-.. function:: symlink(source, link_name)
- symlink(source, link_name, target_is_directory=False)
- Create a symbolic link pointing to *source* named *link_name*.
+.. data:: supports_dir_fd
+
+ A :class:`~collections.Set` object indicating which functions in the
+ :mod:`os` module permit use of their *dir_fd* parameter. Different platforms
+ provide different functionality, and an option that might work on one might
+ be unsupported on another. For consistency's sakes, functions that support
- *dir_fd* always allow specifying the parameter, but will throw an exception
++ *dir_fd* always allow specifying the parameter, but will raise an exception
+ if the functionality is not actually available.
+
+ To check whether a particular function permits use of its *dir_fd*
+ parameter, use the ``in`` operator on ``supports_dir_fd``. As an example,
+ this expression determines whether the *dir_fd* parameter of :func:`os.stat`
+ is locally available::
+
+ os.stat in os.supports_dir_fd
+
+ Currently *dir_fd* parameters only work on Unix platforms; none of them work
+ on Windows.
+
+ .. versionadded:: 3.3
+
+
+.. data:: supports_effective_ids
+
+ A :class:`~collections.Set` object indicating which functions in the
+ :mod:`os` module permit use of the *effective_ids* parameter for
+ :func:`os.access`. If the local platform supports it, the collection will
+ contain :func:`os.access`, otherwise it will be empty.
+
+ To check whether you can use the *effective_ids* parameter for
+ :func:`os.access`, use the ``in`` operator on ``supports_dir_fd``, like so::
+
+ os.access in os.supports_effective_ids
+
+ Currently *effective_ids* only works on Unix platforms; it does not work on
+ Windows.
+
+ .. versionadded:: 3.3
+
- On Windows, symlink version takes an additional optional parameter,
- *target_is_directory*, which defaults to ``False``.
+.. data:: supports_fd
- On Windows, a symlink represents a file or a directory, and does not morph to
- the target dynamically. If *target_is_directory* is set to ``True``, the
- symlink will be created as a directory symlink, otherwise as a file symlink
- (the default).
+ A :class:`~collections.Set` object indicating which functions in the
+ :mod:`os` module permit specifying their *path* parameter as an open file
+ descriptor. Different platforms provide different functionality, and an
+ option that might work on one might be unsupported on another. For
+ consistency's sakes, functions that support *fd* always allow specifying
- the parameter, but will throw an exception if the functionality is not
++ the parameter, but will raise an exception if the functionality is not
+ actually available.
+
+ To check whether a particular function permits specifying an open file
+ descriptor for its *path* parameter, use the ``in`` operator on
+ ``supports_fd``. As an example, this expression determines whether
+ :func:`os.chdir` accepts open file descriptors when called on your local
+ platform::
+
+ os.chdir in os.supports_fd
+
+ .. versionadded:: 3.3
+
+
+.. data:: supports_follow_symlinks
+
+ A :class:`~collections.Set` object indicating which functions in the
+ :mod:`os` module permit use of their *follow_symlinks* parameter. Different
+ platforms provide different functionality, and an option that might work on
+ one might be unsupported on another. For consistency's sakes, functions that
+ support *follow_symlinks* always allow specifying the parameter, but will
- throw an exception if the functionality is not actually available.
++ raise an exception if the functionality is not actually available.
+
+ To check whether a particular function permits use of its *follow_symlinks*
+ parameter, use the ``in`` operator on ``supports_follow_symlinks``. As an
+ example, this expression determines whether the *follow_symlinks* parameter
+ of :func:`os.stat` is locally available::
+
+ os.stat in os.supports_follow_symlinks
+
+ .. versionadded:: 3.3
+
+
+.. function:: symlink(source, link_name, target_is_directory=False, *, dir_fd=None)
+
+ Create a symbolic link pointing to *source* named *link_name*.
+
+ On Windows, a symlink represents either a file or a directory, and does not
+ morph to the target dynamically. If *target_is_directory* is set to ``True``,
+ the symlink will be created as a directory symlink, otherwise as a file symlink
+ (the default). On non-Window platforms, *target_is_directory* is ignored.
Symbolic link support was introduced in Windows 6.0 (Vista). :func:`symlink`
will raise a :exc:`NotImplementedError` on Windows versions earlier than 6.0.
return self.thing
def __exit__(self, *exc_info):
self.thing.close()
- # in the list throw an exception
+
+
+# Inspired by discussions on http://bugs.python.org/issue13585
+class ExitStack(object):
+ """Context manager for dynamic management of a stack of exit callbacks
+
+ For example:
+
+ with ExitStack() as stack:
+ files = [stack.enter_context(open(fname)) for fname in filenames]
+ # All opened files will automatically be closed at the end of
+ # the with statement, even if attempts to open files later
++ # in the list raise an exception
+
+ """
+ def __init__(self):
+ self._exit_callbacks = deque()
+
+ def pop_all(self):
+ """Preserve the context stack by transferring it to a new instance"""
+ new_stack = type(self)()
+ new_stack._exit_callbacks = self._exit_callbacks
+ self._exit_callbacks = deque()
+ return new_stack
+
+ def _push_cm_exit(self, cm, cm_exit):
+ """Helper to correctly register callbacks to __exit__ methods"""
+ def _exit_wrapper(*exc_details):
+ return cm_exit(cm, *exc_details)
+ _exit_wrapper.__self__ = cm
+ self.push(_exit_wrapper)
+
+ def push(self, exit):
+ """Registers a callback with the standard __exit__ method signature
+
+ Can suppress exceptions the same way __exit__ methods can.
+
+ Also accepts any object with an __exit__ method (registering a call
+ to the method instead of the object itself)
+ """
+ # We use an unbound method rather than a bound method to follow
+ # the standard lookup behaviour for special methods
+ _cb_type = type(exit)
+ try:
+ exit_method = _cb_type.__exit__
+ except AttributeError:
+ # Not a context manager, so assume its a callable
+ self._exit_callbacks.append(exit)
+ else:
+ self._push_cm_exit(exit, exit_method)
+ return exit # Allow use as a decorator
+
+ def callback(self, callback, *args, **kwds):
+ """Registers an arbitrary callback and arguments.
+
+ Cannot suppress exceptions.
+ """
+ def _exit_wrapper(exc_type, exc, tb):
+ callback(*args, **kwds)
+ # We changed the signature, so using @wraps is not appropriate, but
+ # setting __wrapped__ may still help with introspection
+ _exit_wrapper.__wrapped__ = callback
+ self.push(_exit_wrapper)
+ return callback # Allow use as a decorator
+
+ def enter_context(self, cm):
+ """Enters the supplied context manager
+
+ If successful, also pushes its __exit__ method as a callback and
+ returns the result of the __enter__ method.
+ """
+ # We look up the special methods on the type to match the with statement
+ _cm_type = type(cm)
+ _exit = _cm_type.__exit__
+ result = _cm_type.__enter__(cm)
+ self._push_cm_exit(cm, _exit)
+ return result
+
+ def close(self):
+ """Immediately unwind the context stack"""
+ self.__exit__(None, None, None)
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, *exc_details):
+ # We manipulate the exception state so it behaves as though
+ # we were actually nesting multiple with statements
+ frame_exc = sys.exc_info()[1]
+ def _fix_exception_context(new_exc, old_exc):
+ while 1:
+ exc_context = new_exc.__context__
+ if exc_context in (None, frame_exc):
+ break
+ new_exc = exc_context
+ new_exc.__context__ = old_exc
+
+ # Callbacks are invoked in LIFO order to match the behaviour of
+ # nested context managers
+ suppressed_exc = False
+ while self._exit_callbacks:
+ cb = self._exit_callbacks.pop()
+ try:
+ if cb(*exc_details):
+ suppressed_exc = True
+ exc_details = (None, None, None)
+ except:
+ new_exc_details = sys.exc_info()
+ # simulate the stack of exceptions by setting the context
+ _fix_exception_context(new_exc_details[1], exc_details[1])
+ if not self._exit_callbacks:
+ raise
+ exc_details = new_exc_details
+ return suppressed_exc
If the first element of pair is false, then the second element is
returned unmodified.
+
+ Optional charset if given is the character set that is used to encode
+ realname in case realname is not ASCII safe. Can be an instance of str or
+ a Charset-like object which has a header_encode method. Default is
+ 'utf-8'.
"""
name, address = pair
- # The address MUST (per RFC) be ascii, so throw a UnicodeError if it isn't.
++ # The address MUST (per RFC) be ascii, so raise an UnicodeError if it isn't.
+ address.encode('ascii')
if name:
- quotes = ''
- if specialsre.search(name):
- quotes = '"'
- name = escapesre.sub(r'\\\g<0>', name)
- return '%s%s%s <%s>' % (quotes, name, quotes, address)
+ try:
+ name.encode('ascii')
+ except UnicodeEncodeError:
+ if isinstance(charset, str):
+ charset = Charset(charset)
+ encoded_name = charset.header_encode(name)
+ return "%s <%s>" % (encoded_name, address)
+ else:
+ quotes = ''
+ if specialsre.search(name):
+ quotes = '"'
+ name = escapesre.sub(r'\\\g<0>', name)
+ return '%s%s%s <%s>' % (quotes, name, quotes, address)
return address
"""IMAP4 client class over SSL connection
- Instantiate with: IMAP4_SSL([host[, port[, keyfile[, certfile]]]])
+ Instantiate with: IMAP4_SSL([host[, port[, keyfile[, certfile[, ssl_context]]]]])
host - host's name (default: localhost);
- port - port number (default: standard IMAP4 SSL port).
+ port - port number (default: standard IMAP4 SSL port);
keyfile - PEM formatted file that contains your private key (default: None);
certfile - PEM formatted certificate chain file (default: None);
- certfile should not be set otherwise ValueError is thrown.
+ ssl_context - a SSLContext object that contains your certificate chain
+ and private key (default: None)
+ Note: if ssl_context is provided, then parameters keyfile or
++ certfile should not be set otherwise ValueError is raised.
for more documentation see the docstring of the parent class IMAP4.
"""
info('process shutting down')
debug('running all "atexit" finalizers with priority >= 0')
_run_finalizers(0)
+
if current_process() is not None:
# We check if the current process is None here because if
- # it's None, any call to ``active_children()`` will throw
- # it's None, any call to ``active_children()`` will raise an
- # AttributeError (active_children winds up trying to get
- # attributes from util._current_process). This happens in a
- # variety of shutdown circumstances that are not well-understood
- # because module-scope variables are not apparently supposed to
- # be destroyed until after this function is called. However,
- # they are indeed destroyed before this function is called. See
- # issues #9775 and #15881. Also related: #4106, #9205, and #9207.
++ # it's None, any call to ``active_children()`` will raise
+ # an AttributeError (active_children winds up trying to
+ # get attributes from util._current_process). One
+ # situation where this can happen is if someone has
+ # manipulated sys.modules, causing this module to be
+ # garbage collected. The destructor for the module type
+ # then replaces all values in the module dict with None.
+ # For instance, after setuptools runs a test it replaces
+ # sys.modules with a copy created earlier. See issues
+ # #9775 and #15881. Also related: #4106, #9205, and
+ # #9207.
+
for p in active_children():
if p._daemonic:
info('calling terminate() for daemon %s', p.name)
def find_loader(fullname):
"""Find a PEP 302 "loader" object for fullname
- If fullname contains dots, path must be the containing package's __path__.
- Returns None if the module cannot be found or imported. This function uses
- iter_importers(), and is thus subject to the same limitations regarding
- platform-specific special import locations such as the Windows registry.
+ This is s convenience wrapper around :func:`importlib.find_loader` that
+ sets the *path* argument correctly when searching for submodules, and
+ also ensures parent packages (if any) are imported before searching for
+ submodules.
"""
- for importer in iter_importers(fullname):
- loader = importer.find_module(fullname)
- if loader is not None:
- return loader
-
- return None
+ if fullname.startswith('.'):
+ msg = "Relative module name {!r} not supported".format(fullname)
+ raise ImportError(msg)
+ path = None
+ pkg_name = fullname.rpartition(".")[0]
+ if pkg_name:
+ pkg = importlib.import_module(pkg_name)
+ path = getattr(pkg, "__path__", None)
+ if path is None:
+ return None
+ try:
+ return importlib.find_loader(fullname, path)
+ except (ImportError, AttributeError, TypeError, ValueError) as ex:
+ # This hack fixes an impedance mismatch between pkgutil and
- # importlib, where the latter throws other errors for cases where
++ # importlib, where the latter raises other errors for cases where
+ # pkgutil previously threw ImportError
+ msg = "Error while finding loader for {!r} ({}: {})"
+ raise ImportError(msg.format(fullname, type(ex), ex)) from ex
def extend_path(path, name):
b'<?xml version="1.0" encoding="utf-8"?><foo>\xe2\x82\xac</foo>')
self.assertEqual(doc.toxml('iso-8859-15'),
b'<?xml version="1.0" encoding="iso-8859-15"?><foo>\xa4</foo>')
+ self.assertEqual(doc.toxml('us-ascii'),
+ b'<?xml version="1.0" encoding="us-ascii"?><foo>€</foo>')
+ self.assertEqual(doc.toxml('utf-16'),
+ '<?xml version="1.0" encoding="utf-16"?>'
+ '<foo>\u20ac</foo>'.encode('utf-16'))
- # Verify that character decoding errors throw exceptions instead
+ # Verify that character decoding errors raise exceptions instead
# of crashing
self.assertRaises(UnicodeDecodeError, parseString,
b'<fran\xe7ais>Comment \xe7a va ? Tr\xe8s bien ?</fran\xe7ais>')
result[getattr(stat, name)])
self.assertIn(attr, members)
+ # Make sure that the st_?time and st_?time_ns fields roughly agree
+ # (they should always agree up to around tens-of-microseconds)
+ for name in 'st_atime st_mtime st_ctime'.split():
+ floaty = int(getattr(result, name) * 100000)
+ nanosecondy = getattr(result, name + "_ns") // 10000
+ self.assertAlmostEqual(floaty, nanosecondy, delta=2)
+
try:
result[200]
- self.fail("No exception thrown")
+ self.fail("No exception raised")
except IndexError:
pass
set([int(x) for x in groups.split()]),
set(posix.getgroups() + [posix.getegid()]))
- posix.stat(support.TESTFN) # should not throw exception
+ # tests for the posix *at functions follow
+
+ @unittest.skipUnless(os.access in os.supports_dir_fd, "test needs dir_fd support for os.access()")
+ def test_access_dir_fd(self):
+ f = posix.open(posix.getcwd(), posix.O_RDONLY)
+ try:
+ self.assertTrue(posix.access(support.TESTFN, os.R_OK, dir_fd=f))
+ finally:
+ posix.close(f)
+
+ @unittest.skipUnless(os.chmod in os.supports_dir_fd, "test needs dir_fd support in os.chmod()")
+ def test_chmod_dir_fd(self):
+ os.chmod(support.TESTFN, stat.S_IRUSR)
+
+ f = posix.open(posix.getcwd(), posix.O_RDONLY)
+ try:
+ posix.chmod(support.TESTFN, stat.S_IRUSR | stat.S_IWUSR, dir_fd=f)
+
+ s = posix.stat(support.TESTFN)
+ self.assertEqual(s[0] & stat.S_IRWXU, stat.S_IRUSR | stat.S_IWUSR)
+ finally:
+ posix.close(f)
+
+ @unittest.skipUnless(os.chown in os.supports_dir_fd, "test needs dir_fd support in os.chown()")
+ def test_chown_dir_fd(self):
+ support.unlink(support.TESTFN)
+ support.create_empty_file(support.TESTFN)
+
+ f = posix.open(posix.getcwd(), posix.O_RDONLY)
+ try:
+ posix.chown(support.TESTFN, os.getuid(), os.getgid(), dir_fd=f)
+ finally:
+ posix.close(f)
+
+ @unittest.skipUnless(os.stat in os.supports_dir_fd, "test needs dir_fd support in os.stat()")
+ def test_stat_dir_fd(self):
+ support.unlink(support.TESTFN)
+ with open(support.TESTFN, 'w') as outfile:
+ outfile.write("testline\n")
+
+ f = posix.open(posix.getcwd(), posix.O_RDONLY)
+ try:
+ s1 = posix.stat(support.TESTFN)
+ s2 = posix.stat(support.TESTFN, dir_fd=f)
+ self.assertEqual(s1, s2)
+ finally:
+ posix.close(f)
+
+ @unittest.skipUnless(os.utime in os.supports_dir_fd, "test needs dir_fd support in os.utime()")
+ def test_utime_dir_fd(self):
+ f = posix.open(posix.getcwd(), posix.O_RDONLY)
+ try:
+ now = time.time()
+ posix.utime(support.TESTFN, None, dir_fd=f)
+ posix.utime(support.TESTFN, dir_fd=f)
+ self.assertRaises(TypeError, posix.utime, support.TESTFN, now, dir_fd=f)
+ self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, None), dir_fd=f)
+ self.assertRaises(TypeError, posix.utime, support.TESTFN, (now, None), dir_fd=f)
+ self.assertRaises(TypeError, posix.utime, support.TESTFN, (None, now), dir_fd=f)
+ self.assertRaises(TypeError, posix.utime, support.TESTFN, (now, "x"), dir_fd=f)
+ posix.utime(support.TESTFN, (int(now), int(now)), dir_fd=f)
+ posix.utime(support.TESTFN, (now, now), dir_fd=f)
+ posix.utime(support.TESTFN,
+ (int(now), int((now - int(now)) * 1e9)), dir_fd=f)
+ posix.utime(support.TESTFN, dir_fd=f,
+ times=(int(now), int((now - int(now)) * 1e9)))
+
+ # try dir_fd and follow_symlinks together
+ if os.utime in os.supports_follow_symlinks:
+ try:
+ posix.utime(support.TESTFN, follow_symlinks=False, dir_fd=f)
+ except ValueError:
+ # whoops! using both together not supported on this platform.
+ pass
+
+ finally:
+ posix.close(f)
+
+ @unittest.skipUnless(os.link in os.supports_dir_fd, "test needs dir_fd support in os.link()")
+ def test_link_dir_fd(self):
+ f = posix.open(posix.getcwd(), posix.O_RDONLY)
+ try:
+ posix.link(support.TESTFN, support.TESTFN + 'link', src_dir_fd=f, dst_dir_fd=f)
+ # should have same inodes
+ self.assertEqual(posix.stat(support.TESTFN)[1],
+ posix.stat(support.TESTFN + 'link')[1])
+ finally:
+ posix.close(f)
+ support.unlink(support.TESTFN + 'link')
+
+ @unittest.skipUnless(os.mkdir in os.supports_dir_fd, "test needs dir_fd support in os.mkdir()")
+ def test_mkdir_dir_fd(self):
+ f = posix.open(posix.getcwd(), posix.O_RDONLY)
+ try:
+ posix.mkdir(support.TESTFN + 'dir', dir_fd=f)
+ posix.stat(support.TESTFN + 'dir') # should not raise exception
+ finally:
+ posix.close(f)
+ support.rmtree(support.TESTFN + 'dir')
+
+ @unittest.skipUnless((os.mknod in os.supports_dir_fd) and hasattr(stat, 'S_IFIFO'),
+ "test requires both stat.S_IFIFO and dir_fd support for os.mknod()")
+ def test_mknod_dir_fd(self):
+ # Test using mknodat() to create a FIFO (the only use specified
+ # by POSIX).
+ support.unlink(support.TESTFN)
+ mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR
+ f = posix.open(posix.getcwd(), posix.O_RDONLY)
+ try:
+ posix.mknod(support.TESTFN, mode, 0, dir_fd=f)
+ except OSError as e:
+ # Some old systems don't allow unprivileged users to use
+ # mknod(), or only support creating device nodes.
+ self.assertIn(e.errno, (errno.EPERM, errno.EINVAL))
+ else:
+ self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode))
+ finally:
+ posix.close(f)
+
+ @unittest.skipUnless(os.open in os.supports_dir_fd, "test needs dir_fd support in os.open()")
+ def test_open_dir_fd(self):
+ support.unlink(support.TESTFN)
+ with open(support.TESTFN, 'w') as outfile:
+ outfile.write("testline\n")
+ a = posix.open(posix.getcwd(), posix.O_RDONLY)
+ b = posix.open(support.TESTFN, posix.O_RDONLY, dir_fd=a)
+ try:
+ res = posix.read(b, 9).decode(encoding="utf-8")
+ self.assertEqual("testline\n", res)
+ finally:
+ posix.close(a)
+ posix.close(b)
+
+ @unittest.skipUnless(os.readlink in os.supports_dir_fd, "test needs dir_fd support in os.readlink()")
+ def test_readlink_dir_fd(self):
+ os.symlink(support.TESTFN, support.TESTFN + 'link')
+ f = posix.open(posix.getcwd(), posix.O_RDONLY)
+ try:
+ self.assertEqual(posix.readlink(support.TESTFN + 'link'),
+ posix.readlink(support.TESTFN + 'link', dir_fd=f))
+ finally:
+ support.unlink(support.TESTFN + 'link')
+ posix.close(f)
+
+ @unittest.skipUnless(os.rename in os.supports_dir_fd, "test needs dir_fd support in os.rename()")
+ def test_rename_dir_fd(self):
+ support.unlink(support.TESTFN)
+ support.create_empty_file(support.TESTFN + 'ren')
+ f = posix.open(posix.getcwd(), posix.O_RDONLY)
+ try:
+ posix.rename(support.TESTFN + 'ren', support.TESTFN, src_dir_fd=f, dst_dir_fd=f)
+ except:
+ posix.rename(support.TESTFN + 'ren', support.TESTFN)
+ raise
+ else:
- posix.stat(support.TESTFN + 'del') # should not throw exception
++ posix.stat(support.TESTFN) # should not raise exception
+ finally:
+ posix.close(f)
+
+ @unittest.skipUnless(os.symlink in os.supports_dir_fd, "test needs dir_fd support in os.symlink()")
+ def test_symlink_dir_fd(self):
+ f = posix.open(posix.getcwd(), posix.O_RDONLY)
+ try:
+ posix.symlink(support.TESTFN, support.TESTFN + 'link', dir_fd=f)
+ self.assertEqual(posix.readlink(support.TESTFN + 'link'), support.TESTFN)
+ finally:
+ posix.close(f)
+ support.unlink(support.TESTFN + 'link')
+
+ @unittest.skipUnless(os.unlink in os.supports_dir_fd, "test needs dir_fd support in os.unlink()")
+ def test_unlink_dir_fd(self):
+ f = posix.open(posix.getcwd(), posix.O_RDONLY)
+ support.create_empty_file(support.TESTFN + 'del')
++ posix.stat(support.TESTFN + 'del') # should not raise exception
+ try:
+ posix.unlink(support.TESTFN + 'del', dir_fd=f)
+ except:
+ support.unlink(support.TESTFN + 'del')
+ raise
+ else:
+ self.assertRaises(OSError, posix.stat, support.TESTFN + 'link')
+ finally:
+ posix.close(f)
+
+ @unittest.skipUnless(os.mkfifo in os.supports_dir_fd, "test needs dir_fd support in os.mkfifo()")
+ def test_mkfifo_dir_fd(self):
+ support.unlink(support.TESTFN)
+ f = posix.open(posix.getcwd(), posix.O_RDONLY)
+ try:
+ posix.mkfifo(support.TESTFN, stat.S_IRUSR | stat.S_IWUSR, dir_fd=f)
+ self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode))
+ finally:
+ posix.close(f)
+
+ requires_sched_h = unittest.skipUnless(hasattr(posix, 'sched_yield'),
+ "don't have scheduling support")
+ requires_sched_affinity = unittest.skipUnless(hasattr(posix, 'sched_setaffinity'),
+ "don't have sched affinity support")
+
+ @requires_sched_h
+ def test_sched_yield(self):
+ # This has no error conditions (at least on Linux).
+ posix.sched_yield()
+
+ @requires_sched_h
+ @unittest.skipUnless(hasattr(posix, 'sched_get_priority_max'),
+ "requires sched_get_priority_max()")
+ def test_sched_priority(self):
+ # Round-robin usually has interesting priorities.
+ pol = posix.SCHED_RR
+ lo = posix.sched_get_priority_min(pol)
+ hi = posix.sched_get_priority_max(pol)
+ self.assertIsInstance(lo, int)
+ self.assertIsInstance(hi, int)
+ self.assertGreaterEqual(hi, lo)
+ # OSX evidently just returns 15 without checking the argument.
+ if sys.platform != "darwin":
+ self.assertRaises(OSError, posix.sched_get_priority_min, -23)
+ self.assertRaises(OSError, posix.sched_get_priority_max, -23)
+
+ @unittest.skipUnless(hasattr(posix, 'sched_setscheduler'), "can't change scheduler")
+ def test_get_and_set_scheduler_and_param(self):
+ possible_schedulers = [sched for name, sched in posix.__dict__.items()
+ if name.startswith("SCHED_")]
+ mine = posix.sched_getscheduler(0)
+ self.assertIn(mine, possible_schedulers)
+ try:
+ parent = posix.sched_getscheduler(os.getppid())
+ except OSError as e:
+ if e.errno != errno.EPERM:
+ raise
+ else:
+ self.assertIn(parent, possible_schedulers)
+ self.assertRaises(OSError, posix.sched_getscheduler, -1)
+ self.assertRaises(OSError, posix.sched_getparam, -1)
+ param = posix.sched_getparam(0)
+ self.assertIsInstance(param.sched_priority, int)
+ try:
+ posix.sched_setscheduler(0, mine, param)
+ except OSError as e:
+ if e.errno != errno.EPERM:
+ raise
+
+ # POSIX states that calling sched_setparam() on a process with a
+ # scheduling policy other than SCHED_FIFO or SCHED_RR is
+ # implementation-defined: FreeBSD returns EINVAL.
+ if not sys.platform.startswith('freebsd'):
+ posix.sched_setparam(0, param)
+ self.assertRaises(OSError, posix.sched_setparam, -1, param)
+
+ self.assertRaises(OSError, posix.sched_setscheduler, -1, mine, param)
+ self.assertRaises(TypeError, posix.sched_setscheduler, 0, mine, None)
+ self.assertRaises(TypeError, posix.sched_setparam, 0, 43)
+ param = posix.sched_param(None)
+ self.assertRaises(TypeError, posix.sched_setparam, 0, param)
+ large = 214748364700
+ param = posix.sched_param(large)
+ self.assertRaises(OverflowError, posix.sched_setparam, 0, param)
+ param = posix.sched_param(sched_priority=-large)
+ self.assertRaises(OverflowError, posix.sched_setparam, 0, param)
+
+ @unittest.skipUnless(hasattr(posix, "sched_rr_get_interval"), "no function")
+ def test_sched_rr_get_interval(self):
+ try:
+ interval = posix.sched_rr_get_interval(0)
+ except OSError as e:
+ # This likely means that sched_rr_get_interval is only valid for
+ # processes with the SCHED_RR scheduler in effect.
+ if e.errno != errno.EINVAL:
+ raise
+ self.skipTest("only works on SCHED_RR processes")
+ self.assertIsInstance(interval, float)
+ # Reasonable constraints, I think.
+ self.assertGreaterEqual(interval, 0.)
+ self.assertLess(interval, 1.)
+
+ @requires_sched_affinity
+ def test_sched_getaffinity(self):
+ mask = posix.sched_getaffinity(0)
+ self.assertIsInstance(mask, set)
+ self.assertGreaterEqual(len(mask), 1)
+ self.assertRaises(OSError, posix.sched_getaffinity, -1)
+ for cpu in mask:
+ self.assertIsInstance(cpu, int)
+ self.assertGreaterEqual(cpu, 0)
+ self.assertLess(cpu, 1 << 32)
+
+ @requires_sched_affinity
+ def test_sched_setaffinity(self):
+ mask = posix.sched_getaffinity(0)
+ if len(mask) > 1:
+ # Empty masks are forbidden
+ mask.pop()
+ posix.sched_setaffinity(0, mask)
+ self.assertEqual(posix.sched_getaffinity(0), mask)
+ self.assertRaises(OSError, posix.sched_setaffinity, 0, [])
+ self.assertRaises(ValueError, posix.sched_setaffinity, 0, [-10])
+ self.assertRaises(OverflowError, posix.sched_setaffinity, 0, [1<<128])
+ self.assertRaises(OSError, posix.sched_setaffinity, -1, mask)
+
+ def test_rtld_constants(self):
+ # check presence of major RTLD_* constants
+ posix.RTLD_LAZY
+ posix.RTLD_NOW
+ posix.RTLD_GLOBAL
+ posix.RTLD_LOCAL
+
+ @unittest.skipUnless(hasattr(os, 'SEEK_HOLE'),
+ "test needs an OS that reports file holes")
+ def test_fs_holes(self):
+ # Even if the filesystem doesn't report holes,
+ # if the OS supports it the SEEK_* constants
+ # will be defined and will have a consistent
+ # behaviour:
+ # os.SEEK_DATA = current position
+ # os.SEEK_HOLE = end of file position
+ with open(support.TESTFN, 'r+b') as fp:
+ fp.write(b"hello")
+ fp.flush()
+ size = fp.tell()
+ fno = fp.fileno()
+ try :
+ for i in range(size):
+ self.assertEqual(i, os.lseek(fno, i, os.SEEK_DATA))
+ self.assertLessEqual(size, os.lseek(fno, i, os.SEEK_HOLE))
+ self.assertRaises(OSError, os.lseek, fno, size, os.SEEK_DATA)
+ self.assertRaises(OSError, os.lseek, fno, size, os.SEEK_HOLE)
+ except OSError :
+ # Some OSs claim to support SEEK_HOLE/SEEK_DATA
+ # but it is not true.
+ # For instance:
+ # http://lists.freebsd.org/pipermail/freebsd-amd64/2012-January/014332.html
+ raise unittest.SkipTest("OSError raised!")
+
class PosixGroupsTester(unittest.TestCase):
def setUp(self):
except UnicodeDecodeError:
return '%s : %s' % (safe_repr(standardMsg), safe_repr(msg))
-
def assertRaises(self, excClass, callableObj=None, *args, **kwargs):
- """Fail unless an exception of class excClass is thrown
+ """Fail unless an exception of class excClass is raised
by callableObj when invoked with arguments args and keyword
arguments kwargs. If a different type of exception is
- thrown, it will not be caught, and the test case will be
+ raised, it will not be caught, and the test case will be
deemed to have suffered an error, exactly as for an
unexpected exception.
#endif
#endif
+
+#ifdef MS_WINDOWS
+static int
+win32_warn_bytes_api()
+{
+ return PyErr_WarnEx(PyExc_DeprecationWarning,
+ "The Windows bytes API has been deprecated, "
+ "use Unicode filenames instead",
+ 1);
+}
+#endif
+
+
+#ifdef AT_FDCWD
+/*
+ * Why the (int) cast? Solaris 10 defines AT_FDCWD as 0xffd19553 (-3041965);
+ * without the int cast, the value gets interpreted as uint (4291925331),
+ * which doesn't play nicely with all the initializer lines in this file that
+ * look like this:
+ * int dir_fd = DEFAULT_DIR_FD;
+ */
+#define DEFAULT_DIR_FD (int)AT_FDCWD
+#else
+#define DEFAULT_DIR_FD (-100)
+#endif
+
+static int
+_fd_converter(PyObject *o, int *p, int default_value) {
+ long long_value;
+ if (o == Py_None) {
+ *p = default_value;
+ return 1;
+ }
+ if (PyFloat_Check(o)) {
+ PyErr_SetString(PyExc_TypeError,
+ "integer argument expected, got float" );
+ return 0;
+ }
+ long_value = PyLong_AsLong(o);
+ if (long_value == -1 && PyErr_Occurred())
+ return 0;
+ if (long_value > INT_MAX) {
+ PyErr_SetString(PyExc_OverflowError,
+ "signed integer is greater than maximum");
+ return 0;
+ }
+ if (long_value < INT_MIN) {
+ PyErr_SetString(PyExc_OverflowError,
+ "signed integer is less than minimum");
+ return 0;
+ }
+ *p = (int)long_value;
+ return 1;
+}
+
+static int
+dir_fd_converter(PyObject *o, void *p) {
+ return _fd_converter(o, (int *)p, DEFAULT_DIR_FD);
+}
+
+
+
+/*
+ * A PyArg_ParseTuple "converter" function
+ * that handles filesystem paths in the manner
+ * preferred by the os module.
+ *
+ * path_converter accepts (Unicode) strings and their
+ * subclasses, and bytes and their subclasses. What
+ * it does with the argument depends on the platform:
+ *
+ * * On Windows, if we get a (Unicode) string we
+ * extract the wchar_t * and return it; if we get
+ * bytes we extract the char * and return that.
+ *
+ * * On all other platforms, strings are encoded
+ * to bytes using PyUnicode_FSConverter, then we
+ * extract the char * from the bytes object and
+ * return that.
+ *
+ * path_converter also optionally accepts signed
+ * integers (representing open file descriptors) instead
+ * of path strings.
+ *
+ * Input fields:
+ * path.nullable
+ * If nonzero, the path is permitted to be None.
+ * path.allow_fd
+ * If nonzero, the path is permitted to be a file handle
+ * (a signed int) instead of a string.
+ * path.function_name
+ * If non-NULL, path_converter will use that as the name
+ * of the function in error messages.
+ * (If path.argument_name is NULL it omits the function name.)
+ * path.argument_name
+ * If non-NULL, path_converter will use that as the name
+ * of the parameter in error messages.
+ * (If path.argument_name is NULL it uses "path".)
+ *
+ * Output fields:
+ * path.wide
+ * Points to the path if it was expressed as Unicode
+ * and was not encoded. (Only used on Windows.)
+ * path.narrow
+ * Points to the path if it was expressed as bytes,
+ * or it was Unicode and was encoded to bytes.
+ * path.fd
+ * Contains a file descriptor if path.accept_fd was true
+ * and the caller provided a signed integer instead of any
+ * sort of string.
+ *
+ * WARNING: if your "path" parameter is optional, and is
+ * unspecified, path_converter will never get called.
+ * So if you set allow_fd, you *MUST* initialize path.fd = -1
+ * yourself!
+ * path.length
+ * The length of the path in characters, if specified as
+ * a string.
+ * path.object
+ * The original object passed in.
+ * path.cleanup
+ * For internal use only. May point to a temporary object.
+ * (Pay no attention to the man behind the curtain.)
+ *
+ * At most one of path.wide or path.narrow will be non-NULL.
+ * If path was None and path.nullable was set,
+ * or if path was an integer and path.allow_fd was set,
+ * both path.wide and path.narrow will be NULL
+ * and path.length will be 0.
+ *
+ * path_converter takes care to not write to the path_t
+ * unless it's successful. However it must reset the
+ * "cleanup" field each time it's called.
+ *
+ * Use as follows:
+ * path_t path;
+ * memset(&path, 0, sizeof(path));
+ * PyArg_ParseTuple(args, "O&", path_converter, &path);
+ * // ... use values from path ...
+ * path_cleanup(&path);
+ *
+ * (Note that if PyArg_Parse fails you don't need to call
+ * path_cleanup(). However it is safe to do so.)
+ */
+typedef struct {
+ char *function_name;
+ char *argument_name;
+ int nullable;
+ int allow_fd;
+ wchar_t *wide;
+ char *narrow;
+ int fd;
+ Py_ssize_t length;
+ PyObject *object;
+ PyObject *cleanup;
+} path_t;
+
+static void
+path_cleanup(path_t *path) {
+ if (path->cleanup) {
+ Py_DECREF(path->cleanup);
+ path->cleanup = NULL;
+ }
+}
+
+static int
+path_converter(PyObject *o, void *p) {
+ path_t *path = (path_t *)p;
+ PyObject *unicode, *bytes;
+ Py_ssize_t length;
+ char *narrow;
+
+#define FORMAT_EXCEPTION(exc, fmt) \
+ PyErr_Format(exc, "%s%s" fmt, \
+ path->function_name ? path->function_name : "", \
+ path->function_name ? ": " : "", \
+ path->argument_name ? path->argument_name : "path")
+
+ /* Py_CLEANUP_SUPPORTED support */
+ if (o == NULL) {
+ path_cleanup(path);
+ return 1;
+ }
+
+ /* ensure it's always safe to call path_cleanup() */
+ path->cleanup = NULL;
+
+ if (o == Py_None) {
+ if (!path->nullable) {
+ FORMAT_EXCEPTION(PyExc_TypeError,
+ "can't specify None for %s argument");
+ return 0;
+ }
+ path->wide = NULL;
+ path->narrow = NULL;
+ path->length = 0;
+ path->object = o;
+ path->fd = -1;
+ return 1;
+ }
+
+ unicode = PyUnicode_FromObject(o);
+ if (unicode) {
+#ifdef MS_WINDOWS
+ wchar_t *wide;
+ length = PyUnicode_GET_SIZE(unicode);
+ if (length > 32767) {
+ FORMAT_EXCEPTION(PyExc_ValueError, "%s too long for Windows");
+ Py_DECREF(unicode);
+ return 0;
+ }
+
+ wide = PyUnicode_AsUnicode(unicode);
+ if (!wide) {
+ Py_DECREF(unicode);
+ return 0;
+ }
+
+ path->wide = wide;
+ path->narrow = NULL;
+ path->length = length;
+ path->object = o;
+ path->fd = -1;
+ path->cleanup = unicode;
+ return Py_CLEANUP_SUPPORTED;
+#else
+ int converted = PyUnicode_FSConverter(unicode, &bytes);
+ Py_DECREF(unicode);
+ if (!converted)
+ bytes = NULL;
+#endif
+ }
+ else {
+ PyErr_Clear();
+ bytes = PyBytes_FromObject(o);
+ if (!bytes) {
+ PyErr_Clear();
+ if (path->allow_fd) {
+ int fd;
+ /*
+ * note: _fd_converter always permits None.
+ * but we've already done our None check.
+ * so o cannot be None at this point.
+ */
+ int result = _fd_converter(o, &fd, -1);
+ if (result) {
+ path->wide = NULL;
+ path->narrow = NULL;
+ path->length = 0;
+ path->object = o;
+ path->fd = fd;
+ return result;
+ }
+ }
+ }
+ }
+
+ if (!bytes) {
+ if (!PyErr_Occurred())
+ FORMAT_EXCEPTION(PyExc_TypeError, "illegal type for %s parameter");
+ return 0;
+ }
+
+#ifdef MS_WINDOWS
+ if (win32_warn_bytes_api()) {
+ Py_DECREF(bytes);
+ return 0;
+ }
+#endif
+
+ length = PyBytes_GET_SIZE(bytes);
+#ifdef MS_WINDOWS
+ if (length > MAX_PATH) {
+ FORMAT_EXCEPTION(PyExc_ValueError, "%s too long for Windows");
+ Py_DECREF(bytes);
+ return 0;
+ }
+#endif
+
+ narrow = PyBytes_AS_STRING(bytes);
+ if (length != strlen(narrow)) {
+ FORMAT_EXCEPTION(PyExc_ValueError, "embedded NUL character in %s");
+ Py_DECREF(bytes);
+ return 0;
+ }
+
+ path->wide = NULL;
+ path->narrow = narrow;
+ path->length = length;
+ path->object = o;
+ path->fd = -1;
+ path->cleanup = bytes;
+ return Py_CLEANUP_SUPPORTED;
+}
+
+static void
+argument_unavailable_error(char *function_name, char *argument_name) {
+ PyErr_Format(PyExc_NotImplementedError,
+ "%s%s%s unavailable on this platform",
+ (function_name != NULL) ? function_name : "",
+ (function_name != NULL) ? ": ": "",
+ argument_name);
+}
+
+static int
+dir_fd_unavailable(PyObject *o, void *p) {
+ int *dir_fd = (int *)p;
+ int return_value = _fd_converter(o, dir_fd, DEFAULT_DIR_FD);
+ if (!return_value)
+ return 0;
+ if (*dir_fd == DEFAULT_DIR_FD)
+ return 1;
+ argument_unavailable_error(NULL, "dir_fd");
+ return 0;
+}
+
+static int
+fd_specified(char *function_name, int fd) {
+ if (fd == -1)
+ return 0;
+
+ argument_unavailable_error(function_name, "fd");
+ return 1;
+}
+
+static int
+follow_symlinks_specified(char *function_name, int follow_symlinks) {
+ if (follow_symlinks)
+ return 0;
+
+ argument_unavailable_error(function_name, "follow_symlinks");
+ return 1;
+}
+
+static int
+path_and_dir_fd_invalid(char *function_name, path_t *path, int dir_fd) {
+ if (!path->narrow && !path->wide && (dir_fd != DEFAULT_DIR_FD)) {
+ PyErr_Format(PyExc_ValueError,
+ "%s: can't specify dir_fd without matching path",
+ function_name);
+ return 1;
+ }
+ return 0;
+}
+
+static int
+dir_fd_and_fd_invalid(char *function_name, int dir_fd, int fd) {
+ if ((dir_fd != DEFAULT_DIR_FD) && (fd != -1)) {
+ PyErr_Format(PyExc_ValueError,
+ "%s: can't specify both dir_fd and fd",
+ function_name);
+ return 1;
+ }
+ return 0;
+}
+
+static int
+fd_and_follow_symlinks_invalid(char *function_name, int fd,
+ int follow_symlinks) {
+ if ((fd > 0) && (!follow_symlinks)) {
+ PyErr_Format(PyExc_ValueError,
+ "%s: cannot use fd and follow_symlinks together",
+ function_name);
+ return 1;
+ }
+ return 0;
+}
+
+static int
+dir_fd_and_follow_symlinks_invalid(char *function_name, int dir_fd,
+ int follow_symlinks) {
+ if ((dir_fd != DEFAULT_DIR_FD) && (!follow_symlinks)) {
+ PyErr_Format(PyExc_ValueError,
+ "%s: cannot use dir_fd and follow_symlinks together",
+ function_name);
+ return 1;
+ }
+ return 0;
+}
+
+/* A helper used by a number of POSIX-only functions */
+#ifndef MS_WINDOWS
+static int
+_parse_off_t(PyObject* arg, void* addr)
+{
+#if !defined(HAVE_LARGEFILE_SUPPORT)
+ *((off_t*)addr) = PyLong_AsLong(arg);
+#else
+ *((off_t*)addr) = PyLong_AsLongLong(arg);
+#endif
+ if (PyErr_Occurred())
+ return 0;
+ return 1;
+}
+#endif
+
#if defined _MSC_VER && _MSC_VER >= 1400
/* Microsoft CRT in VS2005 and higher will verify that a filehandle is
- * valid and throw an assertion if it isn't.
+ * valid and raise an assertion if it isn't.
* Normally, an invalid fd is likely to be a C program error and therefore
* an assertion can be useful, but it does contradict the POSIX standard
* which for write(2) states:
_pickle.Pickler(io.BytesIO(), protocol=-1).dump(l)
_cache[n] = l
- # it still throws RuntimeError even at higher recursion limits
+def test_compiler_recursion():
+ # The compiler uses a scaling factor to support additional levels
+ # of recursion. This is a sanity check of that scaling to ensure
++ # it still raises RuntimeError even at higher recursion limits
+ compile("()" * (10 * sys.getrecursionlimit()), "<single>", "single")
+
def check_limit(n, test_func_name):
sys.setrecursionlimit(n)
if test_func_name.startswith("test_"):