"""
self._check_closed()
- new_task = not isinstance(future, futures.Future)
+ new_task = not futures.isfuture(future)
future = tasks.ensure_future(future, loop=self)
if new_task:
# An exception is raised if the future didn't complete, so there
@functools.wraps(func)
def coro(*args, **kw):
res = func(*args, **kw)
- if isinstance(res, futures.Future) or inspect.isgenerator(res) or \
- isinstance(res, CoroWrapper):
+ if (futures.isfuture(res) or inspect.isgenerator(res) or
+ isinstance(res, CoroWrapper)):
res = yield from res
elif _AwaitableABC is not None:
# If 'func' returns an Awaitable (new in 3.5) we
self.loop.call_exception_handler({'message': msg})
+def isfuture(obj):
+ """Check for a Future.
+
+ This returns True when obj is a Future instance or is advertising
+ itself as duck-type compatible by setting _asyncio_future_blocking.
+ See comment in Future for more details.
+ """
+ return getattr(obj, '_asyncio_future_blocking', None) is not None
+
+
class Future:
"""This class is *almost* compatible with concurrent.futures.Future.
If destination is cancelled, source gets cancelled too.
Compatible with both asyncio.Future and concurrent.futures.Future.
"""
- if not isinstance(source, (Future, concurrent.futures.Future)):
+ if not isfuture(source) and not isinstance(source,
+ concurrent.futures.Future):
raise TypeError('A future is required for source argument')
- if not isinstance(destination, (Future, concurrent.futures.Future)):
+ if not isfuture(destination) and not isinstance(destination,
+ concurrent.futures.Future):
raise TypeError('A future is required for destination argument')
- source_loop = source._loop if isinstance(source, Future) else None
- dest_loop = destination._loop if isinstance(destination, Future) else None
+ source_loop = source._loop if isfuture(source) else None
+ dest_loop = destination._loop if isfuture(destination) else None
def _set_state(future, other):
- if isinstance(future, Future):
+ if isfuture(future):
_copy_future_state(other, future)
else:
_set_concurrent_future_state(future, other)
def wrap_future(future, *, loop=None):
"""Wrap concurrent.futures.Future object."""
- if isinstance(future, Future):
+ if isfuture(future):
return future
assert isinstance(future, concurrent.futures.Future), \
'concurrent.futures.Future is expected, got {!r}'.format(future)
Note: This does not raise TimeoutError! Futures that aren't done
when the timeout occurs are returned in the second set.
"""
- if isinstance(fs, futures.Future) or coroutines.iscoroutine(fs):
+ if futures.isfuture(fs) or coroutines.iscoroutine(fs):
raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
if not fs:
raise ValueError('Set of coroutines/Futures is empty.')
Note: The futures 'f' are not necessarily members of fs.
"""
- if isinstance(fs, futures.Future) or coroutines.iscoroutine(fs):
+ if futures.isfuture(fs) or coroutines.iscoroutine(fs):
raise TypeError("expect a list of futures, not %s" % type(fs).__name__)
loop = loop if loop is not None else events.get_event_loop()
todo = {ensure_future(f, loop=loop) for f in set(fs)}
If the argument is a Future, it is returned directly.
"""
- if isinstance(coro_or_future, futures.Future):
+ if futures.isfuture(coro_or_future):
if loop is not None and loop is not coro_or_future._loop:
raise ValueError('loop argument must agree with Future')
return coro_or_future
arg_to_fut = {}
for arg in set(coros_or_futures):
- if not isinstance(arg, futures.Future):
+ if not futures.isfuture(arg):
fut = ensure_future(arg, loop=loop)
if loop is None:
loop = fut._loop
loop.connect_accepted_socket(
(lambda : proto), conn, ssl=server_ssl))
loop.run_forever()
- conn.close()
+ proto.transport.close()
lsock.close()
thread.join(1)
pass
+class DuckFuture:
+ # Class that does not inherit from Future but aims to be duck-type
+ # compatible with it.
+
+ _asyncio_future_blocking = False
+ __cancelled = False
+ __result = None
+ __exception = None
+
+ def cancel(self):
+ if self.done():
+ return False
+ self.__cancelled = True
+ return True
+
+ def cancelled(self):
+ return self.__cancelled
+
+ def done(self):
+ return (self.__cancelled
+ or self.__result is not None
+ or self.__exception is not None)
+
+ def result(self):
+ assert not self.cancelled()
+ if self.__exception is not None:
+ raise self.__exception
+ return self.__result
+
+ def exception(self):
+ assert not self.cancelled()
+ return self.__exception
+
+ def set_result(self, result):
+ assert not self.done()
+ assert result is not None
+ self.__result = result
+
+ def set_exception(self, exception):
+ assert not self.done()
+ assert exception is not None
+ self.__exception = exception
+
+ def __iter__(self):
+ if not self.done():
+ self._asyncio_future_blocking = True
+ yield self
+ assert self.done()
+ return self.result()
+
+
+class DuckTests(test_utils.TestCase):
+
+ def setUp(self):
+ self.loop = self.new_test_loop()
+ self.addCleanup(self.loop.close)
+
+ def test_wrap_future(self):
+ f = DuckFuture()
+ g = asyncio.wrap_future(f)
+ assert g is f
+
+ def test_ensure_future(self):
+ f = DuckFuture()
+ g = asyncio.ensure_future(f)
+ assert g is f
+
+
class FutureTests(test_utils.TestCase):
def setUp(self):