MST

星途 面试题库

面试题:Python标准库在类编程高级场景下的应用问题

假设有一个分布式计算任务,需要使用Python类编程结合标准库实现任务的分发、执行和结果收集。每个计算节点可能会出现故障,要设计一种机制保证任务能够成功执行。请详细描述整体架构设计,包括使用哪些标准库模块来实现进程间通信、容错处理等功能,并给出核心代码的框架。
27.1万 热度难度
编程语言Python

知识考点

AI 面试

面试题答案

一键面试

整体架构设计

  1. 任务分发器(Dispatcher):负责将任务分发给各个计算节点。
  2. 计算节点(Worker):接收任务并执行,将结果返回给任务分发器。
  3. 结果收集器(Collector):负责收集计算节点返回的结果。

标准库模块

  1. multiprocessing:用于进程间通信和管理进程。可以创建进程、队列用于进程间数据传递。
  2. queue:提供线程安全的队列实现,在multiprocessing中使用Queue进行进程间通信。
  3. time:用于处理时间相关操作,例如设置重试间隔。

容错处理机制

  1. 心跳检测:计算节点定期向任务分发器发送心跳信号,任务分发器通过心跳判断节点是否存活。
  2. 任务重试:如果某个计算节点在规定时间内没有返回结果,任务分发器将该任务重新分配给其他可用的计算节点。

核心代码框架

import multiprocessing
import time


class Task:
    def __init__(self, task_id, data):
        self.task_id = task_id
        self.data = data


class Result:
    def __init__(self, task_id, result):
        self.task_id = task_id
        self.result = result


def worker(worker_id, task_queue, result_queue):
    while True:
        task = task_queue.get()
        if task is None:
            break
        try:
            # 模拟任务执行
            result = task.data * 2
            result_queue.put(Result(task.task_id, result))
        except Exception as e:
            print(f"Worker {worker_id} failed to process task {task.task_id}: {e}")


def dispatcher(task_list, num_workers):
    task_queue = multiprocessing.Queue()
    result_queue = multiprocessing.Queue()
    workers = []

    for i in range(num_workers):
        p = multiprocessing.Process(target=worker, args=(i, task_queue, result_queue))
        p.start()
        workers.append(p)

    for task in task_list:
        task_queue.put(task)

    # 模拟心跳检测和任务重试
    task_status = {task.task_id: False for task in task_list}
    while not all(task_status.values()):
        while not result_queue.empty():
            result = result_queue.get()
            task_status[result.task_id] = True
        for task in task_list:
            if not task_status[task.task_id]:
                # 模拟超时,这里简单设置为1秒
                time.sleep(1)
                task_queue.put(task)

    for _ in range(num_workers):
        task_queue.put(None)
    for p in workers:
        p.join()

    results = []
    while not result_queue.empty():
        results.append(result_queue.get())
    return results


if __name__ == '__main__':
    tasks = [Task(i, i) for i in range(10)]
    final_results = dispatcher(tasks, 3)
    print(final_results)