run_async(foo())
def test_with_7(self):
+ class CM:
+ async def __aenter__(self):
+ return self
+
+ def __aexit__(self, *e):
+ return 444
+
+ async def foo():
+ async with CM():
+ 1/0
+
+ try:
+ run_async(foo())
+ except TypeError as exc:
+ self.assertRegex(
+ exc.args[0], "object int can't be used in 'await' expression")
+ self.assertTrue(exc.__context__ is not None)
+ self.assertTrue(isinstance(exc.__context__, ZeroDivisionError))
+ else:
+ self.fail('invalid asynchronous context manager did not fail')
+
+
+ def test_with_8(self):
+ CNT = 0
+
class CM:
async def __aenter__(self):
return self
return 456
async def foo():
+ nonlocal CNT
async with CM():
- pass
+ CNT += 1
+
with self.assertRaisesRegex(
TypeError, "object int can't be used in 'await' expression"):
run_async(foo())
+ self.assertEqual(CNT, 1)
+
+
+ def test_with_9(self):
+ CNT = 0
+
+ class CM:
+ async def __aenter__(self):
+ return self
+
+ async def __aexit__(self, *e):
+ 1/0
+
+ async def foo():
+ nonlocal CNT
+ async with CM():
+ CNT += 1
+
+ with self.assertRaises(ZeroDivisionError):
+ run_async(foo())
+
+ self.assertEqual(CNT, 1)
+
+ def test_with_10(self):
+ CNT = 0
+
+ class CM:
+ async def __aenter__(self):
+ return self
+
+ async def __aexit__(self, *e):
+ 1/0
+
+ async def foo():
+ nonlocal CNT
+ async with CM():
+ async with CM():
+ raise RuntimeError
+
+ try:
+ run_async(foo())
+ except ZeroDivisionError as exc:
+ self.assertTrue(exc.__context__ is not None)
+ self.assertTrue(isinstance(exc.__context__, ZeroDivisionError))
+ self.assertTrue(isinstance(exc.__context__.__context__,
+ RuntimeError))
+ else:
+ self.fail('exception from __aexit__ did not propagate')
+
+ def test_with_11(self):
+ CNT = 0
+
+ class CM:
+ async def __aenter__(self):
+ raise NotImplementedError
+
+ async def __aexit__(self, *e):
+ 1/0
+
+ async def foo():
+ nonlocal CNT
+ async with CM():
+ raise RuntimeError
+
+ try:
+ run_async(foo())
+ except NotImplementedError as exc:
+ self.assertTrue(exc.__context__ is None)
+ else:
+ self.fail('exception from __aenter__ did not propagate')
+
+ def test_with_12(self):
+ CNT = 0
+
+ class CM:
+ async def __aenter__(self):
+ return self
+
+ async def __aexit__(self, *e):
+ return True
+
+ async def foo():
+ nonlocal CNT
+ async with CM() as cm:
+ self.assertIs(cm.__class__, CM)
+ raise RuntimeError
+
+ run_async(foo())
+
def test_for_1(self):
aiter_calls = 0