raise NotImplementedError(
'cannot submit into a dummy executor')
+ self.loop._process_events = mock.Mock()
+ self.loop._write_to_self = mock.Mock()
+
executor = DummyExecutor()
self.loop.set_default_executor(executor)
self.assertIs(executor, self.loop._default_executor)
with self.assertWarns(DeprecationWarning):
self.loop.set_default_executor(executor)
+ # Avoid cleaning up the executor mock
+ self.loop._default_executor = None
+
def test_call_soon(self):
def cb():
pass
self.loop.set_exception_handler(callback)
# Set corrupted task factory
+ self.addCleanup(self.loop.set_task_factory,
+ self.loop.get_task_factory())
self.loop.set_task_factory(task_factory)
# Run event loop
class TestCase(unittest.TestCase):
@staticmethod
def close_loop(loop):
- executor = loop._default_executor
- if executor is not None:
- executor.shutdown(wait=True)
+ if loop._default_executor is not None:
+ if not loop.is_closed():
+ loop.run_until_complete(loop.shutdown_default_executor())
+ else:
+ loop._default_executor.shutdown(wait=True)
loop.close()
policy = support.maybe_get_event_loop_policy()
if policy is not None: