MST

星途 面试题库

面试题:Python多进程应用之专家难度:动态多进程任务分配与监控

假设有一个任务集合,每个任务的执行时间不同且动态变化。使用Python的`multiprocessing`模块设计一个系统,能够动态地将任务分配给可用的进程,同时实时监控每个进程的执行状态(如运行中、完成、出错),并且当某个进程出现错误时,能够自动重新分配该进程未完成的任务到其他可用进程。请详细描述设计架构、关键数据结构以及核心代码逻辑。
38.6万 热度难度
编程语言Python

知识考点

AI 面试

面试题答案

一键面试

设计架构

  1. 任务队列:用于存储待分配的任务,使用multiprocessing.Queue实现。
  2. 结果队列:用于收集每个进程执行任务后的结果,使用multiprocessing.Queue实现。
  3. 进程池:管理多个进程,使用multiprocessing.Pool实现。
  4. 状态监控:通过进程的is_alive()方法判断进程是否在运行,结合结果队列中的返回值判断任务是否完成或出错。

关键数据结构

  1. 任务:可以用字典表示,例如{"task_id": 1, "execution_time": 10},其中task_id是任务的唯一标识,execution_time是任务预计执行时间。
  2. 进程状态:用字典记录每个进程的状态,例如{"process_id": 1, "status": "running"}

核心代码逻辑

import multiprocessing
import time


def execute_task(task):
    try:
        # 模拟任务执行
        time.sleep(task["execution_time"])
        return {"task_id": task["task_id"], "status": "completed"}
    except Exception as e:
        return {"task_id": task["task_id"], "status": "error", "error": str(e)}


def monitor_processes(pool, task_queue, result_queue, process_status):
    while True:
        for process in pool._pool:
            if not process.is_alive():
                process_status[process.pid] = "finished"
                # 处理进程结束后的任务重新分配
                if process_status[process.pid] == "finished":
                    while not task_queue.empty():
                        task = task_queue.get()
                        new_process = multiprocessing.Process(target=execute_task, args=(task,))
                        new_process.start()
                        process_status[new_process.pid] = "running"
        if result_queue.empty():
            time.sleep(1)
            continue
        result = result_queue.get()
        if result["status"] == "completed":
            print(f"Task {result['task_id']} completed.")
        else:
            print(f"Task {result['task_id']} error: {result['error']}")
            # 重新分配出错任务
            task = {"task_id": result['task_id'], "execution_time": 10}  # 假设重新分配的任务时间
            task_queue.put(task)


def main():
    task_queue = multiprocessing.Queue()
    result_queue = multiprocessing.Queue()
    process_status = {}
    num_processes = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(processes=num_processes)

    # 初始化任务队列
    tasks = [{"task_id": i, "execution_time": i} for i in range(10)]
    for task in tasks:
        task_queue.put(task)

    # 启动进程执行任务
    for _ in range(num_processes):
        task = task_queue.get()
        process = multiprocessing.Process(target=lambda q, t: q.put(execute_task(t)), args=(result_queue, task))
        process.start()
        process_status[process.pid] = "running"

    monitor_processes(pool, task_queue, result_queue, process_status)


if __name__ == '__main__':
    main()
  1. execute_task函数:模拟任务执行,根据任务的执行时间休眠相应时间,并返回任务执行结果,包括任务ID和执行状态(完成或出错)。
  2. monitor_processes函数:监控进程状态,当进程结束时,判断任务执行结果,若出错则将任务重新加入任务队列,同时将新任务分配给新的进程。
  3. main函数:初始化任务队列、结果队列和进程池,将任务放入任务队列,启动进程执行任务,并调用monitor_processes函数进行监控。