]> granicus.if.org Git - python/commitdiff
bpo-32415: Add asyncio.Task.get_loop() and Future.get_loop() (#4992)
authorYury Selivanov <yury@magic.io>
Sat, 23 Dec 2017 20:04:15 +0000 (15:04 -0500)
committerGitHub <noreply@github.com>
Sat, 23 Dec 2017 20:04:15 +0000 (15:04 -0500)
Doc/library/asyncio-task.rst
Lib/asyncio/base_events.py
Lib/asyncio/futures.py
Lib/asyncio/tasks.py
Lib/test/test_asyncio/test_futures.py
Lib/test/test_asyncio/test_tasks.py
Misc/NEWS.d/next/Library/2017-12-23-12-45-00.bpo-32415.YufXTU.rst [new file with mode: 0644]
Modules/_asynciomodule.c
Modules/clinic/_asynciomodule.c.h

index d85dddfa02e92a201d384f95382fb8143d8ac125..71dbe06c899f1845336a2891cbf0065a45f7173c 100644 (file)
@@ -306,6 +306,12 @@ Future
       If the future is already done when this method is called, raises
       :exc:`InvalidStateError`.
 
+   .. method:: get_loop()
+
+      Return the event loop the future object is bound to.
+
+      .. versionadded:: 3.7
+
 
 Example: Future with run_until_complete()
 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
index 2ab8a76e0c5ae2d18dbab2d3327208e7c6ed3f26..96cc4f02588ac8f8954d1743c7069d303a69b24b 100644 (file)
@@ -145,7 +145,7 @@ def _run_until_complete_cb(fut):
             # Issue #22429: run_forever() already finished, no need to
             # stop it.
             return
-    fut._loop.stop()
+    futures._get_loop(fut).stop()
 
 
 class Server(events.AbstractServer):
index b310962f9fe19d6be0a509fde1c2ef3ee0a50276..24843c016a7e07e501d9fbb0892de33cc7fb7740 100644 (file)
@@ -105,6 +105,10 @@ class Future:
             context['source_traceback'] = self._source_traceback
         self._loop.call_exception_handler(context)
 
+    def get_loop(self):
+        """Return the event loop the Future is bound to."""
+        return self._loop
+
     def cancel(self):
         """Cancel the future and schedule callbacks.
 
@@ -249,6 +253,18 @@ class Future:
 _PyFuture = Future
 
 
+def _get_loop(fut):
+    # Tries to call Future.get_loop() if it's available.
+    # Otherwise fallbacks to using the old '_loop' property.
+    try:
+        get_loop = fut.get_loop
+    except AttributeError:
+        pass
+    else:
+        return get_loop()
+    return fut._loop
+
+
 def _set_result_unless_cancelled(fut, result):
     """Helper setting the result only if the future was not cancelled."""
     if fut.cancelled():
@@ -304,8 +320,8 @@ def _chain_future(source, destination):
     if not isfuture(destination) and not isinstance(destination,
                                                     concurrent.futures.Future):
         raise TypeError('A future is required for destination argument')
-    source_loop = source._loop if isfuture(source) else None
-    dest_loop = destination._loop if isfuture(destination) else None
+    source_loop = _get_loop(source) if isfuture(source) else None
+    dest_loop = _get_loop(destination) if isfuture(destination) else None
 
     def _set_state(future, other):
         if isfuture(future):
index ff8a486b544c942f8a5f49fc1bd2139d086d2b3b..572e7073338e79041cd763d346548bfcf37958fb 100644 (file)
@@ -34,7 +34,7 @@ def all_tasks(loop=None):
     """Return a set of all tasks for the loop."""
     if loop is None:
         loop = events.get_event_loop()
-    return {t for t, l in _all_tasks.items() if l is loop}
+    return {t for t in _all_tasks if futures._get_loop(t) is loop}
 
 
 class Task(futures.Future):
@@ -96,7 +96,7 @@ class Task(futures.Future):
         self._coro = coro
 
         self._loop.call_soon(self._step)
-        _register_task(self._loop, self)
+        _register_task(self)
 
     def __del__(self):
         if self._state == futures._PENDING and self._log_destroy_pending:
@@ -215,7 +215,7 @@ class Task(futures.Future):
             blocking = getattr(result, '_asyncio_future_blocking', None)
             if blocking is not None:
                 # Yielded Future must come from Future.__iter__().
-                if result._loop is not self._loop:
+                if futures._get_loop(result) is not self._loop:
                     new_exc = RuntimeError(
                         f'Task {self!r} got Future '
                         f'{result!r} attached to a different loop')
@@ -510,9 +510,9 @@ async def sleep(delay, result=None, *, loop=None):
     if loop is None:
         loop = events.get_event_loop()
     future = loop.create_future()
-    h = future._loop.call_later(delay,
-                                futures._set_result_unless_cancelled,
-                                future, result)
+    h = loop.call_later(delay,
+                        futures._set_result_unless_cancelled,
+                        future, result)
     try:
         return await future
     finally:
@@ -525,7 +525,7 @@ def ensure_future(coro_or_future, *, loop=None):
     If the argument is a Future, it is returned directly.
     """
     if futures.isfuture(coro_or_future):
-        if loop is not None and loop is not coro_or_future._loop:
+        if loop is not None and loop is not futures._get_loop(coro_or_future):
             raise ValueError('loop argument must agree with Future')
         return coro_or_future
     elif coroutines.iscoroutine(coro_or_future):
@@ -655,7 +655,7 @@ def gather(*coros_or_futures, loop=None, return_exceptions=False):
         if arg not in arg_to_fut:
             fut = ensure_future(arg, loop=loop)
             if loop is None:
-                loop = fut._loop
+                loop = futures._get_loop(fut)
             if fut is not arg:
                 # 'arg' was not a Future, therefore, 'fut' is a new
                 # Future created specifically for 'arg'.  Since the caller
@@ -707,7 +707,7 @@ def shield(arg, *, loop=None):
     if inner.done():
         # Shortcut.
         return inner
-    loop = inner._loop
+    loop = futures._get_loop(inner)
     outer = loop.create_future()
 
     def _done_callback(inner):
@@ -751,23 +751,17 @@ def run_coroutine_threadsafe(coro, loop):
     return future
 
 
-# WeakKeyDictionary of {Task: EventLoop} containing all tasks alive.
-# Task should be a weak reference to remove entry on task garbage
-# collection, EventLoop is required
-# to not access to private task._loop attribute.
-_all_tasks = weakref.WeakKeyDictionary()
+# WeakSet containing all alive tasks.
+_all_tasks = weakref.WeakSet()
 
 # Dictionary containing tasks that are currently active in
 # all running event loops.  {EventLoop: Task}
 _current_tasks = {}
 
 
-def _register_task(loop, task):
-    """Register a new task in asyncio as executed by loop.
-
-    Returns None.
-    """
-    _all_tasks[task] = loop
+def _register_task(task):
+    """Register a new task in asyncio as executed by loop."""
+    _all_tasks.add(task)
 
 
 def _enter_task(loop, task):
@@ -786,8 +780,9 @@ def _leave_task(loop, task):
     del _current_tasks[loop]
 
 
-def _unregister_task(loop, task):
-    _all_tasks.pop(task, None)
+def _unregister_task(task):
+    """Unregister a task."""
+    _all_tasks.discard(task)
 
 
 _py_register_task = _register_task
index 5652a42690ed44d8fcce946e06f127288dabd6eb..f777a420b296a26d9bac2be2ac0b927c9da2d9dc 100644 (file)
@@ -139,6 +139,7 @@ class BaseFutureTests:
         asyncio.set_event_loop(self.loop)
         f = self._new_future()
         self.assertIs(f._loop, self.loop)
+        self.assertIs(f.get_loop(), self.loop)
 
     def test_constructor_positional(self):
         # Make sure Future doesn't accept a positional argument
index f1dbb99d4fce16cd4c0064e1f032f745497c78af..84669cd6c7ec87b3c87a0897d10021ea94b6ca9d 100644 (file)
@@ -141,6 +141,7 @@ class BaseTaskTests:
         self.assertTrue(t.done())
         self.assertEqual(t.result(), 'ok')
         self.assertIs(t._loop, self.loop)
+        self.assertIs(t.get_loop(), self.loop)
 
         loop = asyncio.new_event_loop()
         self.set_event_loop(loop)
@@ -2310,10 +2311,11 @@ class BaseTaskIntrospectionTests:
     def test__register_task(self):
         task = mock.Mock()
         loop = mock.Mock()
+        task.get_loop = lambda: loop
         self.assertEqual(asyncio.all_tasks(loop), set())
-        self._register_task(loop, task)
+        self._register_task(task)
         self.assertEqual(asyncio.all_tasks(loop), {task})
-        self._unregister_task(loop, task)
+        self._unregister_task(task)
 
     def test__enter_task(self):
         task = mock.Mock()
@@ -2360,14 +2362,15 @@ class BaseTaskIntrospectionTests:
     def test__unregister_task(self):
         task = mock.Mock()
         loop = mock.Mock()
-        self._register_task(loop, task)
-        self._unregister_task(loop, task)
+        task.get_loop = lambda: loop
+        self._register_task(task)
+        self._unregister_task(task)
         self.assertEqual(asyncio.all_tasks(loop), set())
 
     def test__unregister_task_not_registered(self):
         task = mock.Mock()
         loop = mock.Mock()
-        self._unregister_task(loop, task)
+        self._unregister_task(task)
         self.assertEqual(asyncio.all_tasks(loop), set())
 
 
diff --git a/Misc/NEWS.d/next/Library/2017-12-23-12-45-00.bpo-32415.YufXTU.rst b/Misc/NEWS.d/next/Library/2017-12-23-12-45-00.bpo-32415.YufXTU.rst
new file mode 100644 (file)
index 0000000..f1f5737
--- /dev/null
@@ -0,0 +1 @@
+asyncio: Add Task.get_loop() and Future.get_loop()
index f52297d33f2639634ea2f9197d0905c06924fb30..25acd552b186222ad2169dfd5e183b60b803e807 100644 (file)
@@ -16,7 +16,6 @@ _Py_IDENTIFIER(call_soon);
 _Py_IDENTIFIER(cancel);
 _Py_IDENTIFIER(current_task);
 _Py_IDENTIFIER(get_event_loop);
-_Py_IDENTIFIER(pop);
 _Py_IDENTIFIER(send);
 _Py_IDENTIFIER(throw);
 _Py_IDENTIFIER(_step);
@@ -39,15 +38,12 @@ static PyObject *asyncio_InvalidStateError;
 static PyObject *asyncio_CancelledError;
 
 
-/* WeakKeyDictionary of {Task: EventLoop} containing all tasks alive.
-   Task should be a weak reference to remove entry on task garbage
-   collection, EventLoop is required
-   to not access to private task._loop attribute. */
-static PyObject *current_tasks;
+/* WeakSet containing all alive tasks. */
+static PyObject *all_tasks;
 
 /* Dictionary containing tasks that are currently active in
    all running event loops.  {EventLoop: Task} */
-static PyObject *all_tasks;
+static PyObject *current_tasks;
 
 /* An isinstance type cache for the 'is_coroutine()' function. */
 static PyObject *iscoroutine_typecache;
@@ -186,6 +182,31 @@ is_coroutine(PyObject *coro)
 }
 
 
+static PyObject *
+get_future_loop(PyObject *fut)
+{
+    /* Implementation of `asyncio.futures._get_loop` */
+
+    _Py_IDENTIFIER(get_loop);
+    _Py_IDENTIFIER(_loop);
+
+    if (Future_CheckExact(fut) || Task_CheckExact(fut)) {
+        PyObject *loop = ((FutureObj *)fut)->fut_loop;
+        Py_INCREF(loop);
+        return loop;
+    }
+
+    PyObject *getloop = _PyObject_GetAttrId(fut, &PyId_get_loop);
+    if (getloop != NULL) {
+        PyObject *res = _PyObject_CallNoArg(getloop);
+        Py_DECREF(getloop);
+        return res;
+    }
+
+    return _PyObject_GetAttrId(fut, &PyId__loop);
+}
+
+
 static int
 get_running_loop(PyObject **loop)
 {
@@ -977,6 +998,20 @@ _asyncio_Future_done_impl(FutureObj *self)
     }
 }
 
+/*[clinic input]
+_asyncio.Future.get_loop
+
+Return the event loop the Future is bound to.
+[clinic start generated code]*/
+
+static PyObject *
+_asyncio_Future_get_loop_impl(FutureObj *self)
+/*[clinic end generated code: output=119b6ea0c9816c3f input=cba48c2136c79d1f]*/
+{
+    Py_INCREF(self->fut_loop);
+    return self->fut_loop;
+}
+
 static PyObject *
 FutureObj_get_blocking(FutureObj *fut)
 {
@@ -1295,6 +1330,7 @@ static PyMethodDef FutureType_methods[] = {
     _ASYNCIO_FUTURE_CANCEL_METHODDEF
     _ASYNCIO_FUTURE_CANCELLED_METHODDEF
     _ASYNCIO_FUTURE_DONE_METHODDEF
+    _ASYNCIO_FUTURE_GET_LOOP_METHODDEF
     _ASYNCIO_FUTURE__REPR_INFO_METHODDEF
     _ASYNCIO_FUTURE__SCHEDULE_CALLBACKS_METHODDEF
     {NULL, NULL}        /* Sentinel */
@@ -1759,19 +1795,27 @@ TaskWakeupMethWrapper_new(TaskObj *task)
 /* ----- Task introspection helpers */
 
 static int
-register_task(PyObject *loop, PyObject *task)
+register_task(PyObject *task)
 {
-    return PyObject_SetItem(all_tasks, task, loop);
+    _Py_IDENTIFIER(add);
+
+    PyObject *res = _PyObject_CallMethodIdObjArgs(
+        all_tasks, &PyId_add, task, NULL);
+    if (res == NULL) {
+        return -1;
+    }
+    Py_DECREF(res);
+    return 0;
 }
 
 
 static int
-unregister_task(PyObject *loop, PyObject *task)
+unregister_task(PyObject *task)
 {
-    PyObject *res;
+    _Py_IDENTIFIER(discard);
 
-    res = _PyObject_CallMethodIdObjArgs(all_tasks, &PyId_pop,
-                                        task, Py_None, NULL);
+    PyObject *res = _PyObject_CallMethodIdObjArgs(
+        all_tasks, &PyId_discard, task, NULL);
     if (res == NULL) {
         return -1;
     }
@@ -1871,7 +1915,7 @@ _asyncio_Task___init___impl(TaskObj *self, PyObject *coro, PyObject *loop)
     if (task_call_step_soon(self, NULL)) {
         return -1;
     }
-    return register_task(self->task_loop, (PyObject*)self);
+    return register_task((PyObject*)self);
 }
 
 static int
@@ -2622,7 +2666,7 @@ set_exception:
             }
 
             /* Check if `result` future is attached to a different loop */
-            PyObject *oloop = PyObject_GetAttrString(result, "_loop");
+            PyObject *oloop = get_future_loop(result);
             if (oloop == NULL) {
                 goto fail;
             }
@@ -2928,7 +2972,6 @@ _asyncio_get_running_loop_impl(PyObject *module)
 /*[clinic input]
 _asyncio._register_task
 
-    loop: object
     task: object
 
 Register a new task in asyncio as executed by loop.
@@ -2937,11 +2980,10 @@ Returns None.
 [clinic start generated code]*/
 
 static PyObject *
-_asyncio__register_task_impl(PyObject *module, PyObject *loop,
-                             PyObject *task)
-/*[clinic end generated code: output=54c5cb733dbe0f38 input=9b5fee38fcb2c288]*/
+_asyncio__register_task_impl(PyObject *module, PyObject *task)
+/*[clinic end generated code: output=8672dadd69a7d4e2 input=21075aaea14dfbad]*/
 {
-    if (register_task(loop, task) < 0) {
+    if (register_task(task) < 0) {
         return NULL;
     }
     Py_RETURN_NONE;
@@ -2951,7 +2993,6 @@ _asyncio__register_task_impl(PyObject *module, PyObject *loop,
 /*[clinic input]
 _asyncio._unregister_task
 
-    loop: object
     task: object
 
 Unregister a task.
@@ -2960,11 +3001,10 @@ Returns None.
 [clinic start generated code]*/
 
 static PyObject *
-_asyncio__unregister_task_impl(PyObject *module, PyObject *loop,
-                               PyObject *task)
-/*[clinic end generated code: output=f634743a76b84ebc input=51fa1820634ef331]*/
+_asyncio__unregister_task_impl(PyObject *module, PyObject *task)
+/*[clinic end generated code: output=6e5585706d568a46 input=28fb98c3975f7bdc]*/
 {
-    if (unregister_task(loop, task) < 0) {
+    if (unregister_task(task) < 0) {
         return NULL;
     }
     Py_RETURN_NONE;
@@ -3123,11 +3163,11 @@ module_init(void)
     WITH_MOD("traceback")
     GET_MOD_ATTR(traceback_extract_stack, "extract_stack")
 
-    PyObject *weak_key_dict;
+    PyObject *weak_set;
     WITH_MOD("weakref")
-    GET_MOD_ATTR(weak_key_dict, "WeakKeyDictionary");
-    all_tasks = _PyObject_CallNoArg(weak_key_dict);
-    Py_CLEAR(weak_key_dict);
+    GET_MOD_ATTR(weak_set, "WeakSet");
+    all_tasks = _PyObject_CallNoArg(weak_set);
+    Py_CLEAR(weak_set);
     if (all_tasks == NULL) {
         goto fail;
     }
index 9d5dea52c8e1e12db1c38e754e3b1735ddf6aa19..6a35434ce3ba5c7caf71edd5ba60fe895eaeb538 100644 (file)
@@ -194,6 +194,24 @@ _asyncio_Future_done(FutureObj *self, PyObject *Py_UNUSED(ignored))
     return _asyncio_Future_done_impl(self);
 }
 
+PyDoc_STRVAR(_asyncio_Future_get_loop__doc__,
+"get_loop($self, /)\n"
+"--\n"
+"\n"
+"Return the event loop the Future is bound to.");
+
+#define _ASYNCIO_FUTURE_GET_LOOP_METHODDEF    \
+    {"get_loop", (PyCFunction)_asyncio_Future_get_loop, METH_NOARGS, _asyncio_Future_get_loop__doc__},
+
+static PyObject *
+_asyncio_Future_get_loop_impl(FutureObj *self);
+
+static PyObject *
+_asyncio_Future_get_loop(FutureObj *self, PyObject *Py_UNUSED(ignored))
+{
+    return _asyncio_Future_get_loop_impl(self);
+}
+
 PyDoc_STRVAR(_asyncio_Future__repr_info__doc__,
 "_repr_info($self, /)\n"
 "--\n"
@@ -597,7 +615,7 @@ _asyncio_get_running_loop(PyObject *module, PyObject *Py_UNUSED(ignored))
 }
 
 PyDoc_STRVAR(_asyncio__register_task__doc__,
-"_register_task($module, /, loop, task)\n"
+"_register_task($module, /, task)\n"
 "--\n"
 "\n"
 "Register a new task in asyncio as executed by loop.\n"
@@ -608,30 +626,28 @@ PyDoc_STRVAR(_asyncio__register_task__doc__,
     {"_register_task", (PyCFunction)_asyncio__register_task, METH_FASTCALL|METH_KEYWORDS, _asyncio__register_task__doc__},
 
 static PyObject *
-_asyncio__register_task_impl(PyObject *module, PyObject *loop,
-                             PyObject *task);
+_asyncio__register_task_impl(PyObject *module, PyObject *task);
 
 static PyObject *
 _asyncio__register_task(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames)
 {
     PyObject *return_value = NULL;
-    static const char * const _keywords[] = {"loop", "task", NULL};
-    static _PyArg_Parser _parser = {"OO:_register_task", _keywords, 0};
-    PyObject *loop;
+    static const char * const _keywords[] = {"task", NULL};
+    static _PyArg_Parser _parser = {"O:_register_task", _keywords, 0};
     PyObject *task;
 
     if (!_PyArg_ParseStackAndKeywords(args, nargs, kwnames, &_parser,
-        &loop, &task)) {
+        &task)) {
         goto exit;
     }
-    return_value = _asyncio__register_task_impl(module, loop, task);
+    return_value = _asyncio__register_task_impl(module, task);
 
 exit:
     return return_value;
 }
 
 PyDoc_STRVAR(_asyncio__unregister_task__doc__,
-"_unregister_task($module, /, loop, task)\n"
+"_unregister_task($module, /, task)\n"
 "--\n"
 "\n"
 "Unregister a task.\n"
@@ -642,23 +658,21 @@ PyDoc_STRVAR(_asyncio__unregister_task__doc__,
     {"_unregister_task", (PyCFunction)_asyncio__unregister_task, METH_FASTCALL|METH_KEYWORDS, _asyncio__unregister_task__doc__},
 
 static PyObject *
-_asyncio__unregister_task_impl(PyObject *module, PyObject *loop,
-                               PyObject *task);
+_asyncio__unregister_task_impl(PyObject *module, PyObject *task);
 
 static PyObject *
 _asyncio__unregister_task(PyObject *module, PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames)
 {
     PyObject *return_value = NULL;
-    static const char * const _keywords[] = {"loop", "task", NULL};
-    static _PyArg_Parser _parser = {"OO:_unregister_task", _keywords, 0};
-    PyObject *loop;
+    static const char * const _keywords[] = {"task", NULL};
+    static _PyArg_Parser _parser = {"O:_unregister_task", _keywords, 0};
     PyObject *task;
 
     if (!_PyArg_ParseStackAndKeywords(args, nargs, kwnames, &_parser,
-        &loop, &task)) {
+        &task)) {
         goto exit;
     }
-    return_value = _asyncio__unregister_task_impl(module, loop, task);
+    return_value = _asyncio__unregister_task_impl(module, task);
 
 exit:
     return return_value;
@@ -733,4 +747,4 @@ _asyncio__leave_task(PyObject *module, PyObject *const *args, Py_ssize_t nargs,
 exit:
     return return_value;
 }
-/*[clinic end generated code: output=0033af17965b51b4 input=a9049054013a1b77]*/
+/*[clinic end generated code: output=5d100b3d74f2a0f4 input=a9049054013a1b77]*/