GH-86296: Fix for asyncio.wait_for() swallowing cancellation, and add tests by twisteroidambassador · Pull Request #98607 · python/cpython
async def simultaneous_self_cancel_and_inner_result( self, waitfor_timeout, inner_action, ): """Construct scenario where external cancellation and awaitable becoming done happen simultaneously. inner_acion is one of 'cancel', 'exception' or 'result'. Make sure waitfor_timeout > 0.1. Trying to make it == 0.1 is not a reliable way to make timeout happen at the same time as the above. """ loop = asyncio.get_running_loop() inner = loop.create_future() waitfor_task = asyncio.create_task( asyncio.wait_for(inner, timeout=waitfor_timeout)) await asyncio.sleep(0.1) # Even if waitfor_timeout == 0.1, there's still no guarantee whether the # timer handler (or similar) in wait_for() or code below in this # coroutine executes first. If the timer handler executes first, then # inner will be cancelled(), and code below will raise # InvalidStateError. if inner_action == 'cancel': inner.cancel() elif inner_action == 'exception': inner.set_exception(RuntimeError('inner exception')) else: assert inner_action == 'result' inner.set_result('inner result') waitfor_task.cancel() with self.assertRaises(asyncio.CancelledError): return await waitfor_task # Consume inner's exception, to avoid "Future exception was never # retrieved" messages if inner_action == 'exception': self.assertIsInstance(inner.exception(), RuntimeError)
async def test_simultaneous_self_cancel_and_inner_result(self): for timeout in (10, None): with self.subTest(waitfor_timeout=timeout): await self.simultaneous_self_cancel_and_inner_result( timeout, 'result')
async def test_simultaneous_self_cancel_and_inner_exc(self): for timeout in (10, None): with self.subTest(waitfor_timeout=timeout): await self.simultaneous_self_cancel_and_inner_result( timeout, 'exception')
async def test_simultaneous_self_cancel_and_inner_cancel(self): for timeout in (10, None): with self.subTest(waitfor_timeout=timeout): await self.simultaneous_self_cancel_and_inner_result( timeout, 'cancel')
if __name__ == '__main__': unittest.main()