]> granicus.if.org Git - python/commitdiff
Issue #27041: asyncio: Add loop.create_future method
authorYury Selivanov <yselivanov@sprymix.com>
Mon, 16 May 2016 19:38:39 +0000 (15:38 -0400)
committerYury Selivanov <yselivanov@sprymix.com>
Mon, 16 May 2016 19:38:39 +0000 (15:38 -0400)
14 files changed:
Lib/asyncio/base_events.py
Lib/asyncio/base_subprocess.py
Lib/asyncio/events.py
Lib/asyncio/futures.py
Lib/asyncio/locks.py
Lib/asyncio/proactor_events.py
Lib/asyncio/queues.py
Lib/asyncio/selector_events.py
Lib/asyncio/streams.py
Lib/asyncio/tasks.py
Lib/asyncio/unix_events.py
Lib/asyncio/windows_events.py
Lib/test/test_asyncio/test_futures.py
Misc/NEWS

index ada178f37f4bf01fe2df26eb7a4083016a7563c2..313cc316e533e82e05106ee21b3d2161d9765dbc 100644 (file)
@@ -209,7 +209,7 @@ class Server(events.AbstractServer):
     def wait_closed(self):
         if self.sockets is None or self._waiters is None:
             return
-        waiter = futures.Future(loop=self._loop)
+        waiter = self._loop.create_future()
         self._waiters.append(waiter)
         yield from waiter
 
@@ -243,6 +243,10 @@ class BaseEventLoop(events.AbstractEventLoop):
                 % (self.__class__.__name__, self.is_running(),
                    self.is_closed(), self.get_debug()))
 
+    def create_future(self):
+        """Create a Future object attached to the loop."""
+        return futures.Future(loop=self)
+
     def create_task(self, coro):
         """Schedule a coroutine object.
 
@@ -536,7 +540,7 @@ class BaseEventLoop(events.AbstractEventLoop):
             assert not args
             assert not isinstance(func, events.TimerHandle)
             if func._cancelled:
-                f = futures.Future(loop=self)
+                f = self.create_future()
                 f.set_result(None)
                 return f
             func, args = func._callback, func._args
@@ -579,7 +583,7 @@ class BaseEventLoop(events.AbstractEventLoop):
                     family=0, type=0, proto=0, flags=0):
         info = _ipaddr_info(host, port, family, type, proto)
         if info is not None:
-            fut = futures.Future(loop=self)
+            fut = self.create_future()
             fut.set_result([info])
             return fut
         elif self._debug:
@@ -720,7 +724,7 @@ class BaseEventLoop(events.AbstractEventLoop):
     def _create_connection_transport(self, sock, protocol_factory, ssl,
                                      server_hostname):
         protocol = protocol_factory()
-        waiter = futures.Future(loop=self)
+        waiter = self.create_future()
         if ssl:
             sslcontext = None if isinstance(ssl, bool) else ssl
             transport = self._make_ssl_transport(
@@ -840,7 +844,7 @@ class BaseEventLoop(events.AbstractEventLoop):
                 raise exceptions[0]
 
         protocol = protocol_factory()
-        waiter = futures.Future(loop=self)
+        waiter = self.create_future()
         transport = self._make_datagram_transport(
             sock, protocol, r_addr, waiter)
         if self._debug:
@@ -979,7 +983,7 @@ class BaseEventLoop(events.AbstractEventLoop):
     @coroutine
     def connect_read_pipe(self, protocol_factory, pipe):
         protocol = protocol_factory()
-        waiter = futures.Future(loop=self)
+        waiter = self.create_future()
         transport = self._make_read_pipe_transport(pipe, protocol, waiter)
 
         try:
@@ -996,7 +1000,7 @@ class BaseEventLoop(events.AbstractEventLoop):
     @coroutine
     def connect_write_pipe(self, protocol_factory, pipe):
         protocol = protocol_factory()
-        waiter = futures.Future(loop=self)
+        waiter = self.create_future()
         transport = self._make_write_pipe_transport(pipe, protocol, waiter)
 
         try:
index 73425d9bbcc9640b923ce7960a0412400cbf0224..08080bd70124928fda4e13562d4f7c5bec61d9d8 100644 (file)
@@ -227,7 +227,7 @@ class BaseSubprocessTransport(transports.SubprocessTransport):
         if self._returncode is not None:
             return self._returncode
 
-        waiter = futures.Future(loop=self._loop)
+        waiter = self._loop.create_future()
         self._exit_waiters.append(waiter)
         return (yield from waiter)
 
index 8358ebfd4f153b3854c67bd9fa5cc228da6e84c3..c48c5bed736026301cd64e26a88c3b178ac0a491 100644 (file)
@@ -266,6 +266,9 @@ class AbstractEventLoop:
     def time(self):
         raise NotImplementedError
 
+    def create_future(self):
+        raise NotImplementedError
+
     # Method scheduling a coroutine object: create a task.
 
     def create_task(self, coro):
index ddb9cde188395ceb22537287c85998342f18c95c..1feba4d370070d83068acca374b66cd269bfcadb 100644 (file)
@@ -451,6 +451,8 @@ def wrap_future(future, *, loop=None):
         return future
     assert isinstance(future, concurrent.futures.Future), \
         'concurrent.futures.Future is expected, got {!r}'.format(future)
-    new_future = Future(loop=loop)
+    if loop is None:
+        loop = events.get_event_loop()
+    new_future = loop.create_future()
     _chain_future(future, new_future)
     return new_future
index 34f6bc16ad87b69743d0918bfc5e779d25d2bdc7..1804d7b86458157e9d027b6bee61996811edd39f 100644 (file)
@@ -170,7 +170,7 @@ class Lock(_ContextManagerMixin):
             self._locked = True
             return True
 
-        fut = futures.Future(loop=self._loop)
+        fut = self._loop.create_future()
         self._waiters.append(fut)
         try:
             yield from fut
@@ -258,7 +258,7 @@ class Event:
         if self._value:
             return True
 
-        fut = futures.Future(loop=self._loop)
+        fut = self._loop.create_future()
         self._waiters.append(fut)
         try:
             yield from fut
@@ -320,7 +320,7 @@ class Condition(_ContextManagerMixin):
 
         self.release()
         try:
-            fut = futures.Future(loop=self._loop)
+            fut = self._loop.create_future()
             self._waiters.append(fut)
             try:
                 yield from fut
@@ -433,7 +433,7 @@ class Semaphore(_ContextManagerMixin):
         True.
         """
         while self._value <= 0:
-            fut = futures.Future(loop=self._loop)
+            fut = self._loop.create_future()
             self._waiters.append(fut)
             try:
                 yield from fut
index b2ddee40718ec1684634057280dee423323fee0c..eb92458adae511b221bbc6b842b0284e85c934c4 100644 (file)
@@ -443,7 +443,7 @@ class BaseProactorEventLoop(base_events.BaseEventLoop):
         try:
             base_events._check_resolved_address(sock, address)
         except ValueError as err:
-            fut = futures.Future(loop=self)
+            fut = self.create_future()
             fut.set_exception(err)
             return fut
         else:
index e3a1d5ed60e3b8686ce9fce40c7f12f2a585d517..c453f02d8cf89973b39d6a46f332eeead621e4dc 100644 (file)
@@ -128,7 +128,7 @@ class Queue:
         This method is a coroutine.
         """
         while self.full():
-            putter = futures.Future(loop=self._loop)
+            putter = self._loop.create_future()
             self._putters.append(putter)
             try:
                 yield from putter
@@ -162,7 +162,7 @@ class Queue:
         This method is a coroutine.
         """
         while self.empty():
-            getter = futures.Future(loop=self._loop)
+            getter = self._loop.create_future()
             self._getters.append(getter)
             try:
                 yield from getter
index 7b5a084daec99e32fd9bd727cb98a887c6254984..b34fee34df8f13bce2f97d85ca253cc3f0301041 100644 (file)
@@ -196,7 +196,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
         transport = None
         try:
             protocol = protocol_factory()
-            waiter = futures.Future(loop=self)
+            waiter = self.create_future()
             if sslcontext:
                 transport = self._make_ssl_transport(
                     conn, protocol, sslcontext, waiter=waiter,
@@ -314,7 +314,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
         """
         if self._debug and sock.gettimeout() != 0:
             raise ValueError("the socket must be non-blocking")
-        fut = futures.Future(loop=self)
+        fut = self.create_future()
         self._sock_recv(fut, False, sock, n)
         return fut
 
@@ -352,7 +352,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
         """
         if self._debug and sock.gettimeout() != 0:
             raise ValueError("the socket must be non-blocking")
-        fut = futures.Future(loop=self)
+        fut = self.create_future()
         if data:
             self._sock_sendall(fut, False, sock, data)
         else:
@@ -395,7 +395,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
         """
         if self._debug and sock.gettimeout() != 0:
             raise ValueError("the socket must be non-blocking")
-        fut = futures.Future(loop=self)
+        fut = self.create_future()
         try:
             base_events._check_resolved_address(sock, address)
         except ValueError as err:
@@ -453,7 +453,7 @@ class BaseSelectorEventLoop(base_events.BaseEventLoop):
         """
         if self._debug and sock.gettimeout() != 0:
             raise ValueError("the socket must be non-blocking")
-        fut = futures.Future(loop=self)
+        fut = self.create_future()
         self._sock_accept(fut, False, sock)
         return fut
 
index b7b0485aa0bf0cf6da4b16b0864dc7286709033c..da3d5263f2b520ee70fdec059c999138334131fb 100644 (file)
@@ -210,7 +210,7 @@ class FlowControlMixin(protocols.Protocol):
             return
         waiter = self._drain_waiter
         assert waiter is None or waiter.cancelled()
-        waiter = futures.Future(loop=self._loop)
+        waiter = self._loop.create_future()
         self._drain_waiter = waiter
         yield from waiter
 
@@ -449,7 +449,7 @@ class StreamReader:
             self._paused = False
             self._transport.resume_reading()
 
-        self._waiter = futures.Future(loop=self._loop)
+        self._waiter = self._loop.create_future()
         try:
             yield from self._waiter
         finally:
index cab4998ee44956152d8e5b6d58459f79b0a53d57..81510ba8d49006c5847a035262e1dfd0ed82246d 100644 (file)
@@ -373,7 +373,7 @@ def wait_for(fut, timeout, *, loop=None):
     if timeout is None:
         return (yield from fut)
 
-    waiter = futures.Future(loop=loop)
+    waiter = loop.create_future()
     timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
     cb = functools.partial(_release_waiter, waiter)
 
@@ -406,7 +406,7 @@ def _wait(fs, timeout, return_when, loop):
     The fs argument must be a collection of Futures.
     """
     assert fs, 'Set of Futures is empty.'
-    waiter = futures.Future(loop=loop)
+    waiter = loop.create_future()
     timeout_handle = None
     if timeout is not None:
         timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
@@ -507,7 +507,9 @@ def sleep(delay, result=None, *, loop=None):
         yield
         return result
 
-    future = futures.Future(loop=loop)
+    if loop is None:
+        loop = events.get_event_loop()
+    future = loop.create_future()
     h = future._loop.call_later(delay,
                                 futures._set_result_unless_cancelled,
                                 future, result)
@@ -604,7 +606,9 @@ def gather(*coros_or_futures, loop=None, return_exceptions=False):
     be cancelled.)
     """
     if not coros_or_futures:
-        outer = futures.Future(loop=loop)
+        if loop is None:
+            loop = events.get_event_loop()
+        outer = loop.create_future()
         outer.set_result([])
         return outer
 
@@ -692,7 +696,7 @@ def shield(arg, *, loop=None):
         # Shortcut.
         return inner
     loop = inner._loop
-    outer = futures.Future(loop=loop)
+    outer = loop.create_future()
 
     def _done_callback(inner):
         if outer.cancelled():
index b62dd3896d9c411bd602e9ed57b7fc5135cf8d2c..d712749ee59cdcbbee0519851f8d324b42802dd3 100644 (file)
@@ -177,7 +177,7 @@ class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
                                    stdin, stdout, stderr, bufsize,
                                    extra=None, **kwargs):
         with events.get_child_watcher() as watcher:
-            waiter = futures.Future(loop=self)
+            waiter = self.create_future()
             transp = _UnixSubprocessTransport(self, protocol, args, shell,
                                               stdin, stdout, stderr, bufsize,
                                               waiter=waiter, extra=extra,
index 7be3e02232767e3fe6b7395e76331c9945a1d7ca..668fe1451b65ac034021e08671402f789393c911 100644 (file)
@@ -366,7 +366,7 @@ class ProactorEventLoop(proactor_events.BaseProactorEventLoop):
     def _make_subprocess_transport(self, protocol, args, shell,
                                    stdin, stdout, stderr, bufsize,
                                    extra=None, **kwargs):
-        waiter = futures.Future(loop=self)
+        waiter = self.create_future()
         transp = _WindowsSubprocessTransport(self, protocol, args, shell,
                                              stdin, stdout, stderr, bufsize,
                                              waiter=waiter, extra=extra,
@@ -417,7 +417,7 @@ class IocpProactor:
         return tmp
 
     def _result(self, value):
-        fut = futures.Future(loop=self._loop)
+        fut = self._loop.create_future()
         fut.set_result(value)
         return fut
 
index e80010623b27a35131f54504d9a4b4f881e5aefd..c38c1f29e7fe9f35925e517eacc0718e787ba84e 100644 (file)
@@ -278,14 +278,15 @@ class FutureTests(test_utils.TestCase):
         f2 = asyncio.wrap_future(f1)
         self.assertIs(f1, f2)
 
-    @mock.patch('asyncio.futures.events')
-    def test_wrap_future_use_global_loop(self, m_events):
-        def run(arg):
-            return (arg, threading.get_ident())
-        ex = concurrent.futures.ThreadPoolExecutor(1)
-        f1 = ex.submit(run, 'oi')
-        f2 = asyncio.wrap_future(f1)
-        self.assertIs(m_events.get_event_loop.return_value, f2._loop)
+    def test_wrap_future_use_global_loop(self):
+        with mock.patch('asyncio.futures.events') as events:
+            events.get_event_loop = lambda: self.loop
+            def run(arg):
+                return (arg, threading.get_ident())
+            ex = concurrent.futures.ThreadPoolExecutor(1)
+            f1 = ex.submit(run, 'oi')
+            f2 = asyncio.wrap_future(f1)
+            self.assertIs(self.loop, f2._loop)
 
     def test_wrap_future_cancel(self):
         f1 = concurrent.futures.Future()
index abffd52c0dd433eb705e7d18d1a240080d469aaf..36981d2a89ac66ac36323400b6f1e2430676c1cb 100644 (file)
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -456,6 +456,8 @@ Library
 
 - Issue #27040: Add loop.get_exception_handler method
 
+- Issue #27041: asyncio: Add loop.create_future method
+
 Documentation
 -------------