]> granicus.if.org Git - python/commitdiff
Issue #25304: Add asyncio.run_coroutine_threadsafe(). By Vincent Michel.
authorGuido van Rossum <guido@python.org>
Sat, 3 Oct 2015 15:31:42 +0000 (08:31 -0700)
committerGuido van Rossum <guido@python.org>
Sat, 3 Oct 2015 15:31:42 +0000 (08:31 -0700)
Lib/asyncio/futures.py
Lib/asyncio/tasks.py
Lib/test/test_asyncio/test_futures.py
Lib/test/test_asyncio/test_tasks.py
Misc/ACKS
Misc/NEWS

index dbe06c4a98b2031f118a5d8a9c2e7468ecfc90bf..166bc8047bfbd6f8554cc1571220900a3e50016b 100644 (file)
@@ -390,22 +390,64 @@ class Future:
         __await__ = __iter__ # make compatible with 'await' expression
 
 
-def wrap_future(fut, *, loop=None):
-    """Wrap concurrent.futures.Future object."""
-    if isinstance(fut, Future):
-        return fut
-    assert isinstance(fut, concurrent.futures.Future), \
-        'concurrent.futures.Future is expected, got {!r}'.format(fut)
-    if loop is None:
-        loop = events.get_event_loop()
-    new_future = Future(loop=loop)
+def _set_concurrent_future_state(concurrent, source):
+    """Copy state from a future to a concurrent.futures.Future."""
+    assert source.done()
+    if source.cancelled():
+        concurrent.cancel()
+    if not concurrent.set_running_or_notify_cancel():
+        return
+    exception = source.exception()
+    if exception is not None:
+        concurrent.set_exception(exception)
+    else:
+        result = source.result()
+        concurrent.set_result(result)
+
+
+def _chain_future(source, destination):
+    """Chain two futures so that when one completes, so does the other.
+
+    The result (or exception) of source will be copied to destination.
+    If destination is cancelled, source gets cancelled too.
+    Compatible with both asyncio.Future and concurrent.futures.Future.
+    """
+    if not isinstance(source, (Future, concurrent.futures.Future)):
+        raise TypeError('A future is required for source argument')
+    if not isinstance(destination, (Future, concurrent.futures.Future)):
+        raise TypeError('A future is required for destination argument')
+    source_loop = source._loop if isinstance(source, Future) else None
+    dest_loop = destination._loop if isinstance(destination, Future) else None
+
+    def _set_state(future, other):
+        if isinstance(future, Future):
+            future._copy_state(other)
+        else:
+            _set_concurrent_future_state(future, other)
 
-    def _check_cancel_other(f):
-        if f.cancelled():
-            fut.cancel()
+    def _call_check_cancel(destination):
+        if destination.cancelled():
+            if source_loop is None or source_loop is dest_loop:
+                source.cancel()
+            else:
+                source_loop.call_soon_threadsafe(source.cancel)
 
-    new_future.add_done_callback(_check_cancel_other)
-    fut.add_done_callback(
-        lambda future: loop.call_soon_threadsafe(
-            new_future._copy_state, future))
+    def _call_set_state(source):
+        if dest_loop is None or dest_loop is source_loop:
+            _set_state(destination, source)
+        else:
+            dest_loop.call_soon_threadsafe(_set_state, destination, source)
+
+    destination.add_done_callback(_call_check_cancel)
+    source.add_done_callback(_call_set_state)
+
+
+def wrap_future(future, *, loop=None):
+    """Wrap concurrent.futures.Future object."""
+    if isinstance(future, Future):
+        return future
+    assert isinstance(future, concurrent.futures.Future), \
+        'concurrent.futures.Future is expected, got {!r}'.format(future)
+    new_future = Future(loop=loop)
+    _chain_future(future, new_future)
     return new_future
index 434f498e47026fa0f0cdd6f5e53386da08b4793f..5a7bd9dbcb1d8cf8b44ba4f8655a68cb2206e452 100644 (file)
@@ -3,7 +3,7 @@
 __all__ = ['Task',
            'FIRST_COMPLETED', 'FIRST_EXCEPTION', 'ALL_COMPLETED',
            'wait', 'wait_for', 'as_completed', 'sleep', 'async',
-           'gather', 'shield', 'ensure_future',
+           'gather', 'shield', 'ensure_future', 'run_coroutine_threadsafe',
            ]
 
 import concurrent.futures
@@ -692,3 +692,19 @@ def shield(arg, *, loop=None):
 
     inner.add_done_callback(_done_callback)
     return outer
+
+
+def run_coroutine_threadsafe(coro, loop):
+    """Submit a coroutine object to a given event loop.
+
+    Return a concurrent.futures.Future to access the result.
+    """
+    if not coroutines.iscoroutine(coro):
+        raise TypeError('A coroutine object is required')
+    future = concurrent.futures.Future()
+
+    def callback():
+        futures._chain_future(ensure_future(coro, loop=loop), future)
+
+    loop.call_soon_threadsafe(callback)
+    return future
index c8b6829fb67c31cf2fddc167fa9ca0d9d7b92cf3..0bc0581d2816f9cbea969b02ef59a28922d50ccb 100644 (file)
@@ -174,8 +174,6 @@ class FutureTests(test_utils.TestCase):
                          '<Future cancelled>')
 
     def test_copy_state(self):
-        # Test the internal _copy_state method since it's being directly
-        # invoked in other modules.
         f = asyncio.Future(loop=self.loop)
         f.set_result(10)
 
index 16d3d9da129b25bdf4d4e833c850a3f15541efd5..8ec5d9c9fdce4d029f87cbbb93e0fd6cc06c5cac 100644 (file)
@@ -2100,5 +2100,72 @@ class CoroutineGatherTests(GatherTestsBase, test_utils.TestCase):
         self.assertIsInstance(f.exception(), RuntimeError)
 
 
+class RunCoroutineThreadsafeTests(test_utils.TestCase):
+    """Test case for futures.submit_to_loop."""
+
+    def setUp(self):
+        self.loop = self.new_test_loop(self.time_gen)
+
+    def time_gen(self):
+        """Handle the timer."""
+        yield 0  # second
+        yield 1  # second
+
+    @asyncio.coroutine
+    def add(self, a, b, fail=False, cancel=False):
+        """Wait 1 second and return a + b."""
+        yield from asyncio.sleep(1, loop=self.loop)
+        if fail:
+            raise RuntimeError("Fail!")
+        if cancel:
+            asyncio.tasks.Task.current_task(self.loop).cancel()
+            yield
+        return a + b
+
+    def target(self, fail=False, cancel=False, timeout=None):
+        """Run add coroutine in the event loop."""
+        coro = self.add(1, 2, fail=fail, cancel=cancel)
+        future = asyncio.run_coroutine_threadsafe(coro, self.loop)
+        try:
+            return future.result(timeout)
+        finally:
+            future.done() or future.cancel()
+
+    def test_run_coroutine_threadsafe(self):
+        """Test coroutine submission from a thread to an event loop."""
+        future = self.loop.run_in_executor(None, self.target)
+        result = self.loop.run_until_complete(future)
+        self.assertEqual(result, 3)
+
+    def test_run_coroutine_threadsafe_with_exception(self):
+        """Test coroutine submission from a thread to an event loop
+        when an exception is raised."""
+        future = self.loop.run_in_executor(None, self.target, True)
+        with self.assertRaises(RuntimeError) as exc_context:
+            self.loop.run_until_complete(future)
+        self.assertIn("Fail!", exc_context.exception.args)
+
+    def test_run_coroutine_threadsafe_with_timeout(self):
+        """Test coroutine submission from a thread to an event loop
+        when a timeout is raised."""
+        callback = lambda: self.target(timeout=0)
+        future = self.loop.run_in_executor(None, callback)
+        with self.assertRaises(asyncio.TimeoutError):
+            self.loop.run_until_complete(future)
+        # Clear the time generator and tasks
+        test_utils.run_briefly(self.loop)
+        # Check that there's no pending task (add has been cancelled)
+        for task in asyncio.Task.all_tasks(self.loop):
+            self.assertTrue(task.done())
+
+    def test_run_coroutine_threadsafe_task_cancelled(self):
+        """Test coroutine submission from a tread to an event loop
+        when the task is cancelled."""
+        callback = lambda: self.target(cancel=True)
+        future = self.loop.run_in_executor(None, callback)
+        with self.assertRaises(asyncio.CancelledError):
+            self.loop.run_until_complete(future)
+
+
 if __name__ == '__main__':
     unittest.main()
index 9e2c57de7afd464d9bfbc882e77f9c6cdf78ea46..cae34e6b369b94bcc020a20511e1e8983057362d 100644 (file)
--- a/Misc/ACKS
+++ b/Misc/ACKS
@@ -929,6 +929,7 @@ Steven Miale
 Trent Mick
 Jason Michalski
 Franck Michea
+Vincent Michel
 Tom Middleton
 Thomas Miedema
 Stan Mihai
index 1c8bc491e656947f6171aa19eb7553a7afe1b66f..d4bea39dc6106b7a9eb7a332d81d5ca50698f31e 100644 (file)
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -90,6 +90,10 @@ Core and Builtins
 Library
 -------
 
+- Issue #25304: Add asyncio.run_coroutine_threadsafe().  This lets you
+  submit a coroutine to a loop from another thread, returning a
+  concurrent.futures.Future.  By Vincent Michel.
+
 - Issue #25232: Fix CGIRequestHandler to split the query from the URL at the
   first question mark (?) rather than the last. Patch from Xiang Zhang.