面试题答案
一键面试import concurrent.futures
import os
def count_lines_in_file(file_path):
try:
with open(file_path, 'r', encoding='utf-8') as file:
return len(file.readlines())
except FileNotFoundError:
print(f"文件 {file_path} 未找到")
return 0
except UnicodeDecodeError:
print(f"文件 {file_path} 编码解析错误")
return 0
def count_lines_in_files_concurrently(file_paths):
with concurrent.futures.ProcessPoolExecutor() as executor:
results = list(executor.map(count_lines_in_file, file_paths))
for i, count in enumerate(results):
print(f"文件 {file_paths[i]} 的行数为: {count}")
if __name__ == '__main__':
file_paths = ['file1.txt', 'file2.txt', 'file3.txt'] # 替换为实际文件路径
count_lines_in_files_concurrently(file_paths)
说明:
count_lines_in_file
函数负责打开单个文件并统计其行数,同时处理文件未找到和编码解析错误的异常。count_lines_in_files_concurrently
函数使用concurrent.futures
库中的ProcessPoolExecutor
创建进程池,并使用executor.map
方法并发执行对每个文件的行数统计操作。- 在
if __name__ == '__main__':
块中,定义要读取的文件路径列表并调用count_lines_in_files_concurrently
函数。注意,在Windows系统下使用multiprocessing
相关模块时,需要将相关代码放在if __name__ == '__main__':
块中。
如果使用 multiprocessing
模块,代码如下:
import multiprocessing
import os
def count_lines_in_file(file_path):
try:
with open(file_path, 'r', encoding='utf-8') as file:
return len(file.readlines())
except FileNotFoundError:
print(f"文件 {file_path} 未找到")
return 0
except UnicodeDecodeError:
print(f"文件 {file_path} 编码解析错误")
return 0
if __name__ == '__main__':
file_paths = ['file1.txt', 'file2.txt', 'file3.txt'] # 替换为实际文件路径
pool = multiprocessing.Pool()
results = pool.map(count_lines_in_file, file_paths)
pool.close()
pool.join()
for i, count in enumerate(results):
print(f"文件 {file_paths[i]} 的行数为: {count}")
这里使用 multiprocessing.Pool
创建进程池,并通过 pool.map
方法实现并发操作,同样要将相关代码放在 if __name__ == '__main__':
块中。