面试题答案
一键面试实现思路
- 导入必要的库:引入
asyncio
库用于异步操作,aiofiles
库用于异步文件I/O操作。 - 定义异步读取函数:每个函数负责读取一个文件,使用
aiofiles.open
异步打开文件并读取内容。 - 创建任务列表:将每个文件读取函数包装成
asyncio.Task
并加入任务列表。 - 运行异步任务:使用
asyncio.gather
来并行运行所有任务,并等待所有任务完成。 - 异常处理:在异步函数内部使用
try - except
块捕获文件操作可能出现的异常,如文件不存在等。
代码示例
import asyncio
import aiofiles
async def read_file(file_path):
try:
async with aiofiles.open(file_path, mode='r') as f:
content = await f.read()
return content
except FileNotFoundError:
print(f"文件 {file_path} 不存在")
return None
except Exception as e:
print(f"读取文件 {file_path} 时发生错误: {e}")
return None
async def read_multiple_files(file_paths):
tasks = []
for file_path in file_paths:
task = asyncio.create_task(read_file(file_path))
tasks.append(task)
results = await asyncio.gather(*tasks)
return results
if __name__ == "__main__":
file_paths = ["file1.txt", "file2.txt", "file3.txt"]
loop = asyncio.get_event_loop()
try:
contents = loop.run_until_complete(read_multiple_files(file_paths))
for i, content in enumerate(contents):
if content:
print(f"文件 {file_paths[i]} 的内容: {content}")
finally:
loop.close()
在上述代码中:
read_file
函数负责异步读取单个文件,并处理文件不存在和其他一般性异常。read_multiple_files
函数创建多个读取文件的任务,并等待所有任务完成。- 在
if __name__ == "__main__"
部分,定义了要读取的文件路径列表,并运行异步任务获取文件内容。同时处理了事件循环的开启和关闭。