面试题答案
一键面试import asyncio
async def sub_coroutine():
try:
await asyncio.sleep(10)
print('子协程正常完成')
except asyncio.CancelledError:
print('子协程被取消')
async def main_coroutine():
tasks = []
for _ in range(3):
task = asyncio.create_task(sub_coroutine())
tasks.append(task)
try:
await asyncio.wait(tasks, timeout=2)
except asyncio.TimeoutError:
print('超时,取消所有子任务')
for task in tasks:
task.cancel()
# 等待所有任务处理取消异常
await asyncio.gather(*tasks, return_exceptions=True)
# 这里可以添加清理资源的代码,例如关闭文件、数据库连接等
print('资源清理完成')
if __name__ == "__main__":
asyncio.run(main_coroutine())
代码解释
- 子协程
sub_coroutine
:- 这个协程使用
await asyncio.sleep(10)
模拟一个长时间运行的任务。 - 使用
try - except
块捕获asyncio.CancelledError
异常,当子协程被取消时会抛出这个异常,在except
块中打印提示信息,表明子协程被取消。
- 这个协程使用
- 主协程
main_coroutine
:- 创建一个空列表
tasks
用于存储子任务。 - 使用
for
循环创建多个子任务(这里是3个),并添加到tasks
列表中。 - 使用
asyncio.wait(tasks, timeout = 2)
等待所有子任务完成,设置超时时间为2秒。 - 如果发生
asyncio.TimeoutError
异常,说明超时了。此时遍历tasks
列表,对每个任务调用task.cancel()
方法取消任务。 - 调用
asyncio.gather(*tasks, return_exceptions = True)
等待所有任务处理取消异常。return_exceptions = True
确保即使任务被取消抛出异常,gather
也不会将异常向上传递,而是将异常作为结果返回。 - 在
gather
调用之后,可以添加清理资源的代码,比如关闭文件、数据库连接等。
- 创建一个空列表
if __name__ == "__main__"
:- 使用
asyncio.run(main_coroutine())
运行主协程。
- 使用
通过上述代码和步骤,可以确保在主协程取消时所有子协程都能正确取消,并且在超时发生时能够处理超时异常并清理相关资源。