◐ Shell
clean mode source ↗

[3.8] bpo-32751: Wait for task cancel in asyncio.wait_for() when timeout <= 0 (GH-21895) by elprans · Pull Request #21967 · python/cpython

Expand Up @@ -888,6 +888,9 @@ async def inner(): nonlocal task_done try: await asyncio.sleep(0.2) except asyncio.CancelledError: await asyncio.sleep(0.1) raise finally: task_done = True
Expand All @@ -900,6 +903,34 @@ async def inner():
loop.run_until_complete(foo())
def test_wait_for_waits_for_task_cancellation_w_timeout_0(self): loop = asyncio.new_event_loop() self.addCleanup(loop.close)
task_done = False
async def foo(): async def inner(): nonlocal task_done try: await asyncio.sleep(10) except asyncio.CancelledError: await asyncio.sleep(0.1) raise finally: task_done = True
inner_task = self.new_task(loop, inner()) await asyncio.sleep(0.1) await asyncio.wait_for(inner_task, timeout=0)
with self.assertRaises(asyncio.TimeoutError) as cm: loop.run_until_complete(foo())
self.assertTrue(task_done) chained = cm.exception.__context__ self.assertEqual(type(chained), asyncio.CancelledError)
def test_wait_for_self_cancellation(self): loop = asyncio.new_event_loop() self.addCleanup(loop.close) Expand Down