设计架构
- 任务队列:用于存储待分配的任务,使用
multiprocessing.Queue
实现。
- 结果队列:用于收集每个进程执行任务后的结果,使用
multiprocessing.Queue
实现。
- 进程池:管理多个进程,使用
multiprocessing.Pool
实现。
- 状态监控:通过进程的
is_alive()
方法判断进程是否在运行,结合结果队列中的返回值判断任务是否完成或出错。
关键数据结构
- 任务:可以用字典表示,例如
{"task_id": 1, "execution_time": 10}
,其中task_id
是任务的唯一标识,execution_time
是任务预计执行时间。
- 进程状态:用字典记录每个进程的状态,例如
{"process_id": 1, "status": "running"}
。
核心代码逻辑
import multiprocessing
import time
def execute_task(task):
try:
# 模拟任务执行
time.sleep(task["execution_time"])
return {"task_id": task["task_id"], "status": "completed"}
except Exception as e:
return {"task_id": task["task_id"], "status": "error", "error": str(e)}
def monitor_processes(pool, task_queue, result_queue, process_status):
while True:
for process in pool._pool:
if not process.is_alive():
process_status[process.pid] = "finished"
# 处理进程结束后的任务重新分配
if process_status[process.pid] == "finished":
while not task_queue.empty():
task = task_queue.get()
new_process = multiprocessing.Process(target=execute_task, args=(task,))
new_process.start()
process_status[new_process.pid] = "running"
if result_queue.empty():
time.sleep(1)
continue
result = result_queue.get()
if result["status"] == "completed":
print(f"Task {result['task_id']} completed.")
else:
print(f"Task {result['task_id']} error: {result['error']}")
# 重新分配出错任务
task = {"task_id": result['task_id'], "execution_time": 10} # 假设重新分配的任务时间
task_queue.put(task)
def main():
task_queue = multiprocessing.Queue()
result_queue = multiprocessing.Queue()
process_status = {}
num_processes = multiprocessing.cpu_count()
pool = multiprocessing.Pool(processes=num_processes)
# 初始化任务队列
tasks = [{"task_id": i, "execution_time": i} for i in range(10)]
for task in tasks:
task_queue.put(task)
# 启动进程执行任务
for _ in range(num_processes):
task = task_queue.get()
process = multiprocessing.Process(target=lambda q, t: q.put(execute_task(t)), args=(result_queue, task))
process.start()
process_status[process.pid] = "running"
monitor_processes(pool, task_queue, result_queue, process_status)
if __name__ == '__main__':
main()
execute_task
函数:模拟任务执行,根据任务的执行时间休眠相应时间,并返回任务执行结果,包括任务ID和执行状态(完成或出错)。
monitor_processes
函数:监控进程状态,当进程结束时,判断任务执行结果,若出错则将任务重新加入任务队列,同时将新任务分配给新的进程。
main
函数:初始化任务队列、结果队列和进程池,将任务放入任务队列,启动进程执行任务,并调用monitor_processes
函数进行监控。