self.loop.run_forever()
self.loop._selector.select.assert_called_once_with(0)
+ async def leave_unfinalized_asyncgen(self):
+ # Create an async generator, iterate it partially, and leave it
+ # to be garbage collected.
+ # Used in async generator finalization tests.
+ # Depends on implementation details of garbage collector. Changes
+ # in gc may break this function.
+ status = {'started': False,
+ 'stopped': False,
+ 'finalized': False}
+
+ async def agen():
+ status['started'] = True
+ try:
+ for item in ['ZERO', 'ONE', 'TWO', 'THREE', 'FOUR']:
+ yield item
+ finally:
+ status['finalized'] = True
+
+ ag = agen()
+ ai = ag.__aiter__()
+
+ async def iter_one():
+ try:
+ item = await ai.__anext__()
+ except StopAsyncIteration:
+ return
+ if item == 'THREE':
+ status['stopped'] = True
+ return
+ asyncio.create_task(iter_one())
+
+ asyncio.create_task(iter_one())
+ return status
+
+ def test_asyncgen_finalization_by_gc(self):
+ # Async generators should be finalized when garbage collected.
+ self.loop._process_events = mock.Mock()
+ self.loop._write_to_self = mock.Mock()
+ with support.disable_gc():
+ status = self.loop.run_until_complete(self.leave_unfinalized_asyncgen())
+ while not status['stopped']:
+ test_utils.run_briefly(self.loop)
+ self.assertTrue(status['started'])
+ self.assertTrue(status['stopped'])
+ self.assertFalse(status['finalized'])
+ support.gc_collect()
+ test_utils.run_briefly(self.loop)
+ self.assertTrue(status['finalized'])
+
+ def test_asyncgen_finalization_by_gc_in_other_thread(self):
+ # Python issue 34769: If garbage collector runs in another
+ # thread, async generators will not finalize in debug
+ # mode.
+ self.loop._process_events = mock.Mock()
+ self.loop._write_to_self = mock.Mock()
+ self.loop.set_debug(True)
+ with support.disable_gc():
+ status = self.loop.run_until_complete(self.leave_unfinalized_asyncgen())
+ while not status['stopped']:
+ test_utils.run_briefly(self.loop)
+ self.assertTrue(status['started'])
+ self.assertTrue(status['stopped'])
+ self.assertFalse(status['finalized'])
+ self.loop.run_until_complete(
+ self.loop.run_in_executor(None, support.gc_collect))
+ test_utils.run_briefly(self.loop)
+ self.assertTrue(status['finalized'])
+
class MyProto(asyncio.Protocol):
done = None