MST

星途 面试题库

面试题:优化使用Python concurrent.futures模块处理高并发任务

假设你需要使用Python的concurrent.futures模块处理大量高并发I/O任务,任务数量可能达到数千甚至上万,每个任务需要与外部服务进行多次交互并处理响应数据。请阐述你会如何进行优化,以提高整体的执行效率和稳定性,包括但不限于线程/进程池大小的设置、资源管理、异常处理以及如何避免潜在的死锁或资源竞争问题,并给出相应的代码框架示例。
12.9万 热度难度
编程语言Python

知识考点

AI 面试

面试题答案

一键面试

优化策略

  1. 线程/进程池大小设置
    • 对于I/O密集型任务,通常使用线程池。线程池大小可以根据CPU核心数和I/O等待时间估算。一般经验法则是设置为CPU核心数的数倍(例如4 - 8倍),假设CPU核心数为num_cpus,线程池大小pool_size = num_cpus * 4。可以通过multiprocessing.cpu_count()获取CPU核心数。
  2. 资源管理
    • 尽量复用与外部服务的连接。例如,如果是HTTP请求,可以使用requests.Session对象,它会自动管理连接池,减少连接建立和关闭的开销。
    • 对于其他资源(如数据库连接等),同样采用连接池等技术进行复用。
  3. 异常处理
    • 在任务函数中捕获可能的异常,如网络异常(requests库中的RequestException)、外部服务返回错误等。使用try - except块捕获异常,并记录详细的错误信息,以便调试。
    • 在提交任务到线程池后,使用as_completedresult方法获取任务结果时,也需要处理可能的异常,如concurrent.futures.TimeoutError(任务超时)、concurrent.futures.CancelledError(任务被取消)等。
  4. 避免死锁或资源竞争
    • 确保共享资源(如连接池、全局变量等)的访问是线程安全的。对于共享资源的操作,可以使用threading.Lockqueue.Queue等线程安全的数据结构。
    • 避免循环依赖资源,在设计任务逻辑时,要仔细规划资源的获取和释放顺序。

代码框架示例

import concurrent.futures
import requests
import multiprocessing


def process_task(task_data, session):
    try:
        # 与外部服务进行多次交互
        response1 = session.get('http://example.com/api1', params=task_data)
        response1.raise_for_status()
        data1 = response1.json()

        response2 = session.post('http://example.com/api2', json=data1)
        response2.raise_for_status()
        data2 = response2.json()

        # 处理响应数据
        result = data2['key']
        return result
    except requests.RequestException as e:
        # 记录网络相关异常
        print(f"Network error in task: {e}")
    except KeyError as e:
        # 记录数据解析异常
        print(f"Data parsing error in task: {e}")


def main():
    num_cpus = multiprocessing.cpu_count()
    pool_size = num_cpus * 4
    tasks = [{"param1": "value1"}, {"param2": "value2"}]  # 假设任务数据列表
    with requests.Session() as session:
        with concurrent.futures.ThreadPoolExecutor(max_workers=pool_size) as executor:
            futures = [executor.submit(process_task, task_data, session) for task_data in tasks]
            for future in concurrent.futures.as_completed(futures):
                try:
                    result = future.result()
                    print(f"Task result: {result}")
                except concurrent.futures.TimeoutError:
                    print("Task timed out")
                except concurrent.futures.CancelledError:
                    print("Task was cancelled")


if __name__ == '__main__':
    main()