设计思路
- 任务队列:使用
queue.Queue
来存储用户上传的文件任务。它是线程安全的,能确保多个线程可以安全地读写任务。
- 线程池:利用
concurrent.futures.ThreadPoolExecutor
创建线程池。可以通过调整线程池的大小,根据服务器的CPU核心数和内存等资源,合理分配线程数量,避免资源耗尽。
- 任务处理函数:针对不同类型的文件,编写不同的处理函数。例如,对于文本文件的词频统计函数和图片文件的尺寸调整函数。
- 异常处理:在任务处理函数中,使用
try - except
块捕获可能出现的异常,如文件格式不支持、文件读取错误等,并进行适当的处理,如记录日志、返回错误信息给用户。
核心代码框架
import concurrent.futures
import queue
import logging
# 配置日志
logging.basicConfig(level=logging.INFO)
# 任务队列
task_queue = queue.Queue()
# 文本文件词频统计函数
def process_text_file(file_path):
try:
word_count = {}
with open(file_path, 'r', encoding='utf - 8') as file:
for line in file:
words = line.split()
for word in words:
if word not in word_count:
word_count[word] = 1
else:
word_count[word] += 1
return word_count
except FileNotFoundError as e:
logging.error(f"文本文件 {file_path} 未找到: {e}")
except UnicodeDecodeError as e:
logging.error(f"文本文件 {file_path} 解码错误: {e}")
# 图片文件尺寸调整函数
def process_image_file(file_path):
try:
from PIL import Image
img = Image.open(file_path)
new_img = img.resize((img.width // 2, img.height // 2))
new_img.save(file_path.replace('.', '_resized.'))
return f"图片 {file_path} 尺寸已调整并保存为 {file_path.replace('.', '_resized.')}"
except FileNotFoundError as e:
logging.error(f"图片文件 {file_path} 未找到: {e}")
except ImportError as e:
logging.error(f"未安装PIL库: {e}")
# 任务处理函数
def process_task():
while True:
try:
file_path, file_type = task_queue.get()
if file_type == 'text':
result = process_text_file(file_path)
elif file_type == 'image':
result = process_image_file(file_path)
else:
logging.warning(f"不支持的文件类型: {file_type}")
task_queue.task_done()
except Exception as e:
logging.error(f"处理任务时出现异常: {e}")
# 创建线程池
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
# 启动线程处理任务
for _ in range(4):
executor.submit(process_task)
# 模拟添加任务
task_queue.put(('test.txt', 'text'))
task_queue.put(('test.jpg', 'image'))
# 等待所有任务完成
task_queue.join()