The server is closed asynchronously, use the :meth:`wait_closed`
coroutine to wait until the server is closed.
+ .. method:: get_loop()
+
+ Gives the event loop associated with the server object.
+
+ .. versionadded:: 3.7
+
.. coroutinemethod:: wait_closed()
Wait until the :meth:`close` method completes.
"""Coroutine to wait until service is closed."""
return NotImplemented
+ def get_loop(self):
+ """ Get the event loop the Server object is attached to."""
+ return NotImplemented
+
class AbstractEventLoop:
"""Abstract event loop."""
get_running_loop_impl = events._c_get_running_loop
get_event_loop_impl = events._c_get_event_loop
+class TestServer(unittest.TestCase):
+
+ def test_get_loop(self):
+ loop = asyncio.new_event_loop()
+ proto = MyProto(loop)
+ server = loop.run_until_complete(loop.create_server(lambda: proto, '0.0.0.0', 0))
+ self.assertEqual(server.get_loop(), loop)
+ loop.close()
+
+class TestAbstractServer(unittest.TestCase):
+
+ def test_get_loop(self):
+ self.assertEqual(events.AbstractServer().get_loop(), NotImplemented)
if __name__ == '__main__':
unittest.main()