bpo-36829: Add test.support.catch_unraisable_exception() (GH-13490)
authorVictor Stinner <vstinner@redhat.com>
Wed, 22 May 2019 21:44:02 +0000 (23:44 +0200)
committerGitHub <noreply@github.com>
Wed, 22 May 2019 21:44:02 +0000 (23:44 +0200)
* Copy test_exceptions.test_unraisable() to
  test_sys.UnraisableHookTest().
* Use catch_unraisable_exception() in test_coroutines,
  test_exceptions, test_generators.

Lib/test/support/__init__.py
Lib/test/test_coroutines.py
Lib/test/test_exceptions.py
Lib/test/test_generators.py
Lib/test/test_sys.py
Misc/NEWS.d/next/Tests/2019-05-22-12-57-15.bpo-36829.e9mRWC.rst [new file with mode: 0644]

index 9e60d960ab12f4dd8a3f61db5b3d10eda6861e7c..2fe9d9dc80998f23653516b62cb1bed345026a5e 100644 (file)
@@ -3034,3 +3034,36 @@ def collision_stats(nbins, nballs):
         collisions = k - occupied
         var = dn*(dn-1)*((dn-2)/dn)**k + meanempty * (1 - meanempty)
         return float(collisions), float(var.sqrt())
+
+
+class catch_unraisable_exception:
+    """
+    Context manager catching unraisable exception using sys.unraisablehook.
+
+    Usage:
+
+        with support.catch_unraisable_exception() as cm:
+            ...
+
+            # check the expected unraisable exception: use cm.unraisable
+            ...
+
+        # cm.unraisable is None here (to break a reference cycle)
+    """
+
+    def __init__(self):
+        self.unraisable = None
+        self._old_hook = None
+
+    def _hook(self, unraisable):
+        self.unraisable = unraisable
+
+    def __enter__(self):
+        self._old_hook = sys.unraisablehook
+        sys.unraisablehook = self._hook
+        return self
+
+    def __exit__(self, *exc_info):
+        # Clear the unraisable exception to explicitly break a reference cycle
+        self.unraisable = None
+        sys.unraisablehook = self._old_hook
index 8443e658a620b93c1d78f6fc5d388e46f3f188a2..036f13fa50e9ae2b2fa85aac5ded3b83ec13549f 100644 (file)
@@ -2342,12 +2342,19 @@ class OriginTrackingTest(unittest.TestCase):
         orig_wuc = warnings._warn_unawaited_coroutine
         try:
             warnings._warn_unawaited_coroutine = lambda coro: 1/0
-            with support.captured_stderr() as stream:
-                corofn()
+            with support.catch_unraisable_exception() as cm, \
+                 support.captured_stderr() as stream:
+                # only store repr() to avoid keeping the coroutine alive
+                coro = corofn()
+                coro_repr = repr(coro)
+
+                # clear reference to the coroutine without awaiting for it
+                del coro
                 support.gc_collect()
-            self.assertIn("Exception ignored in", stream.getvalue())
-            self.assertIn("ZeroDivisionError", stream.getvalue())
-            self.assertIn("was never awaited", stream.getvalue())
+
+                self.assertEqual(repr(cm.unraisable.object), coro_repr)
+                self.assertEqual(cm.unraisable.exc_type, ZeroDivisionError)
+                self.assertIn("was never awaited", stream.getvalue())
 
             del warnings._warn_unawaited_coroutine
             with support.captured_stderr() as stream:
index 6ef529e2b015be884f0588e46d86c7cb48a4531a..d7e11d2d30a87c30b6461b1b5bac7fb7cfc053d3 100644 (file)
@@ -12,6 +12,9 @@ from test.support import (TESTFN, captured_stderr, check_impl_detail,
                           check_warnings, cpython_only, gc_collect, run_unittest,
                           no_tracing, unlink, import_module, script_helper,
                           SuppressCrashReport)
+from test import support
+
+
 class NaiveException(Exception):
     def __init__(self, x):
         self.x = x
@@ -1181,29 +1184,12 @@ class ExceptionTests(unittest.TestCase):
                 # The following line is included in the traceback report:
                 raise exc
 
-        class BrokenExceptionDel:
-            def __del__(self):
-                exc = BrokenStrException()
-                # The following line is included in the traceback report:
-                raise exc
+        obj = BrokenDel()
+        with support.catch_unraisable_exception() as cm:
+            del obj
 
-        for test_class in (BrokenDel, BrokenExceptionDel):
-            with self.subTest(test_class):
-                obj = test_class()
-                with captured_stderr() as stderr:
-                    del obj
-                report = stderr.getvalue()
-                self.assertIn("Exception ignored", report)
-                self.assertIn(test_class.__del__.__qualname__, report)
-                self.assertIn("test_exceptions.py", report)
-                self.assertIn("raise exc", report)
-                if test_class is BrokenExceptionDel:
-                    self.assertIn("BrokenStrException", report)
-                    self.assertIn("<exception str() failed>", report)
-                else:
-                    self.assertIn("ValueError", report)
-                    self.assertIn("del is broken", report)
-                self.assertTrue(report.endswith("\n"))
+            self.assertEqual(cm.unraisable.object, BrokenDel.__del__)
+            self.assertIsNotNone(cm.unraisable.exc_traceback)
 
     def test_unhandled(self):
         # Check for sensible reporting of unhandled exceptions
index 320793c7dab69bf5f350e87ca45a2c4e2acb4022..7f1472fa03ac306f48b034dafc6372390fd9dc53 100644 (file)
@@ -2156,25 +2156,21 @@ explicitly, without generators. We do have to redirect stderr to avoid
 printing warnings and to doublecheck that we actually tested what we wanted
 to test.
 
->>> import sys, io
->>> old = sys.stderr
->>> try:
-...     sys.stderr = io.StringIO()
-...     class Leaker:
-...         def __del__(self):
-...             def invoke(message):
-...                 raise RuntimeError(message)
-...             invoke("test")
+>>> from test import support
+>>> class Leaker:
+...     def __del__(self):
+...         def invoke(message):
+...             raise RuntimeError(message)
+...         invoke("del failed")
 ...
+>>> with support.catch_unraisable_exception() as cm:
 ...     l = Leaker()
 ...     del l
-...     err = sys.stderr.getvalue().strip()
-...     "Exception ignored in" in err
-...     "RuntimeError: test" in err
-...     "Traceback" in err
-...     "in invoke" in err
-... finally:
-...     sys.stderr = old
+...
+...     cm.unraisable.object == Leaker.__del__
+...     cm.unraisable.exc_type == RuntimeError
+...     str(cm.unraisable.exc_value) == "del failed"
+...     cm.unraisable.exc_traceback is not None
 True
 True
 True
index 2b358ca0466e6e70f7e7f5d9bc1efd9e4031aa46..67a952d9b4544ccc223e339b150ba0c2cb43b078 100644 (file)
@@ -909,6 +909,47 @@ class UnraisableHookTest(unittest.TestCase):
         self.assertIn('Traceback (most recent call last):\n', err)
         self.assertIn('ValueError: 42\n', err)
 
+    def test_original_unraisablehook_err(self):
+        # bpo-22836: PyErr_WriteUnraisable() should give sensible reports
+        class BrokenDel:
+            def __del__(self):
+                exc = ValueError("del is broken")
+                # The following line is included in the traceback report:
+                raise exc
+
+        class BrokenStrException(Exception):
+            def __str__(self):
+                raise Exception("str() is broken")
+
+        class BrokenExceptionDel:
+            def __del__(self):
+                exc = BrokenStrException()
+                # The following line is included in the traceback report:
+                raise exc
+
+        for test_class in (BrokenDel, BrokenExceptionDel):
+            with self.subTest(test_class):
+                obj = test_class()
+                with test.support.captured_stderr() as stderr, \
+                     test.support.swap_attr(sys, 'unraisablehook',
+                                            sys.__unraisablehook__):
+                    # Trigger obj.__del__()
+                    del obj
+
+                report = stderr.getvalue()
+                self.assertIn("Exception ignored", report)
+                self.assertIn(test_class.__del__.__qualname__, report)
+                self.assertIn("test_sys.py", report)
+                self.assertIn("raise exc", report)
+                if test_class is BrokenExceptionDel:
+                    self.assertIn("BrokenStrException", report)
+                    self.assertIn("<exception str() failed>", report)
+                else:
+                    self.assertIn("ValueError", report)
+                    self.assertIn("del is broken", report)
+                self.assertTrue(report.endswith("\n"))
+
+
     def test_original_unraisablehook_wrong_type(self):
         exc = ValueError(42)
         with test.support.swap_attr(sys, 'unraisablehook',
diff --git a/Misc/NEWS.d/next/Tests/2019-05-22-12-57-15.bpo-36829.e9mRWC.rst b/Misc/NEWS.d/next/Tests/2019-05-22-12-57-15.bpo-36829.e9mRWC.rst
new file mode 100644 (file)
index 0000000..4ab342b
--- /dev/null
@@ -0,0 +1,2 @@
+Add :func:`test.support.catch_unraisable_exception`: context manager
+catching unraisable exception using :func:`sys.unraisablehook`.