import functools
import inspect
+import opcode
import os
import sys
import traceback
+import types
from . import events
from . import futures
from .log import logger
+
+# Opcode of "yield from" instruction
+_YIELD_FROM = opcode.opmap['YIELD_FROM']
+
# If you set _DEBUG to true, @coroutine will wrap the resulting
# generator objects in a CoroWrapper instance (defined below). That
# instance will log a message when the generator is never iterated
_PY35 = (sys.version_info >= (3, 5))
+
+# Check for CPython issue #21209
+def has_yield_from_bug():
+ class MyGen:
+ def __init__(self):
+ self.send_args = None
+ def __iter__(self):
+ return self
+ def __next__(self):
+ return 42
+ def send(self, *what):
+ self.send_args = what
+ return None
+ def yield_from_gen(gen):
+ yield from gen
+ value = (1, 2, 3)
+ gen = MyGen()
+ coro = yield_from_gen(gen)
+ next(coro)
+ coro.send(value)
+ return gen.send_args != (value,)
+_YIELD_FROM_BUG = has_yield_from_bug()
+del has_yield_from_bug
+
+
class CoroWrapper:
# Wrapper for coroutine in _DEBUG mode.
def __next__(self):
return next(self.gen)
- def send(self, *value):
- # We use `*value` because of a bug in CPythons prior
- # to 3.4.1. See issue #21209 and test_yield_from_corowrapper
- # for details. This workaround should be removed in 3.5.0.
- if len(value) == 1:
- value = value[0]
- return self.gen.send(value)
+ if _YIELD_FROM_BUG:
+ # For for CPython issue #21209: using "yield from" and a custom
+ # generator, generator.send(tuple) unpacks the tuple instead of passing
+ # the tuple unchanged. Check if the caller is a generator using "yield
+ # from" to decide if the parameter should be unpacked or not.
+ def send(self, *value):
+ frame = sys._getframe()
+ caller = frame.f_back
+ assert caller.f_lasti >= 0
+ if caller.f_code.co_code[caller.f_lasti] != _YIELD_FROM:
+ value = value[0]
+ return self.gen.send(value)
+ else:
+ def send(self, value):
+ return self.gen.send(value)
def throw(self, exc):
return self.gen.throw(exc)
return getattr(func, '_is_coroutine', False)
+_COROUTINE_TYPES = (CoroWrapper, types.GeneratorType)
+
def iscoroutine(obj):
"""Return True if obj is a coroutine object."""
- return isinstance(obj, CoroWrapper) or inspect.isgenerator(obj)
+ return isinstance(obj, _COROUTINE_TYPES)
def _format_coroutine(coro):