collisions = k - occupied
var = dn*(dn-1)*((dn-2)/dn)**k + meanempty * (1 - meanempty)
return float(collisions), float(var.sqrt())
+
+
+class catch_unraisable_exception:
+ """
+ Context manager catching unraisable exception using sys.unraisablehook.
+
+ Usage:
+
+ with support.catch_unraisable_exception() as cm:
+ ...
+
+ # check the expected unraisable exception: use cm.unraisable
+ ...
+
+ # cm.unraisable is None here (to break a reference cycle)
+ """
+
+ def __init__(self):
+ self.unraisable = None
+ self._old_hook = None
+
+ def _hook(self, unraisable):
+ self.unraisable = unraisable
+
+ def __enter__(self):
+ self._old_hook = sys.unraisablehook
+ sys.unraisablehook = self._hook
+ return self
+
+ def __exit__(self, *exc_info):
+ # Clear the unraisable exception to explicitly break a reference cycle
+ self.unraisable = None
+ sys.unraisablehook = self._old_hook
orig_wuc = warnings._warn_unawaited_coroutine
try:
warnings._warn_unawaited_coroutine = lambda coro: 1/0
- with support.captured_stderr() as stream:
- corofn()
+ with support.catch_unraisable_exception() as cm, \
+ support.captured_stderr() as stream:
+ # only store repr() to avoid keeping the coroutine alive
+ coro = corofn()
+ coro_repr = repr(coro)
+
+ # clear reference to the coroutine without awaiting for it
+ del coro
support.gc_collect()
- self.assertIn("Exception ignored in", stream.getvalue())
- self.assertIn("ZeroDivisionError", stream.getvalue())
- self.assertIn("was never awaited", stream.getvalue())
+
+ self.assertEqual(repr(cm.unraisable.object), coro_repr)
+ self.assertEqual(cm.unraisable.exc_type, ZeroDivisionError)
+ self.assertIn("was never awaited", stream.getvalue())
del warnings._warn_unawaited_coroutine
with support.captured_stderr() as stream:
check_warnings, cpython_only, gc_collect, run_unittest,
no_tracing, unlink, import_module, script_helper,
SuppressCrashReport)
+from test import support
+
+
class NaiveException(Exception):
def __init__(self, x):
self.x = x
# The following line is included in the traceback report:
raise exc
- class BrokenExceptionDel:
- def __del__(self):
- exc = BrokenStrException()
- # The following line is included in the traceback report:
- raise exc
+ obj = BrokenDel()
+ with support.catch_unraisable_exception() as cm:
+ del obj
- for test_class in (BrokenDel, BrokenExceptionDel):
- with self.subTest(test_class):
- obj = test_class()
- with captured_stderr() as stderr:
- del obj
- report = stderr.getvalue()
- self.assertIn("Exception ignored", report)
- self.assertIn(test_class.__del__.__qualname__, report)
- self.assertIn("test_exceptions.py", report)
- self.assertIn("raise exc", report)
- if test_class is BrokenExceptionDel:
- self.assertIn("BrokenStrException", report)
- self.assertIn("<exception str() failed>", report)
- else:
- self.assertIn("ValueError", report)
- self.assertIn("del is broken", report)
- self.assertTrue(report.endswith("\n"))
+ self.assertEqual(cm.unraisable.object, BrokenDel.__del__)
+ self.assertIsNotNone(cm.unraisable.exc_traceback)
def test_unhandled(self):
# Check for sensible reporting of unhandled exceptions
printing warnings and to doublecheck that we actually tested what we wanted
to test.
->>> import sys, io
->>> old = sys.stderr
->>> try:
-... sys.stderr = io.StringIO()
-... class Leaker:
-... def __del__(self):
-... def invoke(message):
-... raise RuntimeError(message)
-... invoke("test")
+>>> from test import support
+>>> class Leaker:
+... def __del__(self):
+... def invoke(message):
+... raise RuntimeError(message)
+... invoke("del failed")
...
+>>> with support.catch_unraisable_exception() as cm:
... l = Leaker()
... del l
-... err = sys.stderr.getvalue().strip()
-... "Exception ignored in" in err
-... "RuntimeError: test" in err
-... "Traceback" in err
-... "in invoke" in err
-... finally:
-... sys.stderr = old
+...
+... cm.unraisable.object == Leaker.__del__
+... cm.unraisable.exc_type == RuntimeError
+... str(cm.unraisable.exc_value) == "del failed"
+... cm.unraisable.exc_traceback is not None
True
True
True
self.assertIn('Traceback (most recent call last):\n', err)
self.assertIn('ValueError: 42\n', err)
+ def test_original_unraisablehook_err(self):
+ # bpo-22836: PyErr_WriteUnraisable() should give sensible reports
+ class BrokenDel:
+ def __del__(self):
+ exc = ValueError("del is broken")
+ # The following line is included in the traceback report:
+ raise exc
+
+ class BrokenStrException(Exception):
+ def __str__(self):
+ raise Exception("str() is broken")
+
+ class BrokenExceptionDel:
+ def __del__(self):
+ exc = BrokenStrException()
+ # The following line is included in the traceback report:
+ raise exc
+
+ for test_class in (BrokenDel, BrokenExceptionDel):
+ with self.subTest(test_class):
+ obj = test_class()
+ with test.support.captured_stderr() as stderr, \
+ test.support.swap_attr(sys, 'unraisablehook',
+ sys.__unraisablehook__):
+ # Trigger obj.__del__()
+ del obj
+
+ report = stderr.getvalue()
+ self.assertIn("Exception ignored", report)
+ self.assertIn(test_class.__del__.__qualname__, report)
+ self.assertIn("test_sys.py", report)
+ self.assertIn("raise exc", report)
+ if test_class is BrokenExceptionDel:
+ self.assertIn("BrokenStrException", report)
+ self.assertIn("<exception str() failed>", report)
+ else:
+ self.assertIn("ValueError", report)
+ self.assertIn("del is broken", report)
+ self.assertTrue(report.endswith("\n"))
+
+
def test_original_unraisablehook_wrong_type(self):
exc = ValueError(42)
with test.support.swap_attr(sys, 'unraisablehook',
--- /dev/null
+Add :func:`test.support.catch_unraisable_exception`: context manager
+catching unraisable exception using :func:`sys.unraisablehook`.