import asyncio
import signal
import contextlib
async def sub_coroutine(ctx, task_name):
try:
while not ctx.canceled():
await asyncio.sleep(1)
print(f"{task_name} is running...")
except asyncio.CancelledError:
print(f"{task_name} was cancelled.")
async def sub_function(ctx, task_name):
await asyncio.gather(sub_coroutine(ctx, task_name))
async def main():
ctx, cancel = contextlib.asynccontextmanager(asyncio.get_running_loop().create_task)(asyncio.timeout(5))
try:
await asyncio.gather(
sub_function(ctx, "Task1"),
sub_function(ctx, "Task2")
)
except asyncio.TimeoutError:
print("Timeout, cancelling tasks...")
cancel()
finally:
await ctx.aclose()
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGINT, loop.stop)
try:
loop.run_until_complete(main())
finally:
loop.close()
如何传递context
- 函数参数传递:在
main
函数中创建context
对象ctx
后,通过函数参数将ctx
传递给sub_function
。
- 协程参数传递:
sub_function
函数接收ctx
后,又将其作为参数传递给sub_coroutine
协程。这样在每个子函数和协程中都能获取到context
对象,当context
被取消时,各个子任务能够捕获asyncio.CancelledError
异常从而正确处理取消操作。