]> granicus.if.org Git - python/commitdiff
Issue #24867: Fix Task.get_stack() for 'async def' coroutines
authorYury Selivanov <yselivanov@sprymix.com>
Fri, 14 Aug 2015 19:30:59 +0000 (15:30 -0400)
committerYury Selivanov <yselivanov@sprymix.com>
Fri, 14 Aug 2015 19:30:59 +0000 (15:30 -0400)
Lib/asyncio/tasks.py
Lib/test/test_asyncio/test_tasks.py

index 9bfc1cf81479c68b271ab81a1812436e9201401f..a235e742e234d5fcd4e449a33c7e9095499a820e 100644 (file)
@@ -128,7 +128,11 @@ class Task(futures.Future):
         returned for a suspended coroutine.
         """
         frames = []
-        f = self._coro.gi_frame
+        try:
+            # 'async def' coroutines
+            f = self._coro.cr_frame
+        except AttributeError:
+            f = self._coro.gi_frame
         if f is not None:
             while f is not None:
                 if limit is not None:
index 251192acee4029b9c5c19580e4de1b922471e771..04267873103a0bdea1be26abdeb64c7a6d0c8c71 100644 (file)
@@ -2,6 +2,7 @@
 
 import contextlib
 import functools
+import io
 import os
 import re
 import sys
@@ -162,6 +163,37 @@ class TaskTests(test_utils.TestCase):
                                    'function is deprecated, use ensure_'):
             self.assertIs(f, asyncio.async(f))
 
+    def test_get_stack(self):
+        T = None
+
+        @asyncio.coroutine
+        def foo():
+            yield from bar()
+
+        @asyncio.coroutine
+        def bar():
+            # test get_stack()
+            f = T.get_stack(limit=1)
+            try:
+                self.assertEqual(f[0].f_code.co_name, 'foo')
+            finally:
+                f = None
+
+            # test print_stack()
+            file = io.StringIO()
+            T.print_stack(limit=1, file=file)
+            file.seek(0)
+            tb = file.read()
+            self.assertRegex(tb, r'foo\(\) running')
+
+        @asyncio.coroutine
+        def runner():
+            nonlocal T
+            T = asyncio.ensure_future(foo(), loop=self.loop)
+            yield from T
+
+        self.loop.run_until_complete(runner())
+
     def test_task_repr(self):
         self.loop.set_debug(False)