MST

星途 面试题库

面试题:Python信号量在复杂任务调度中的应用

假设你有一个任务队列,里面的任务分为不同优先级,需要用Python信号量结合线程池来实现:高优先级任务最多同时执行5个,中优先级任务最多同时执行3个,低优先级任务最多同时执行2个。请描述整体设计思路并给出关键代码片段。
17.6万 热度难度
编程语言Python

知识考点

AI 面试

面试题答案

一键面试

整体设计思路

  1. 任务队列:使用Python的queue.PriorityQueue来存储任务,任务按照优先级排序,优先级高的任务优先执行。
  2. 信号量:为不同优先级的任务创建对应的信号量,高优先级任务信号量初始值为5,中优先级为3,低优先级为2,用于限制同一时间执行的任务数量。
  3. 线程池:使用concurrent.futures.ThreadPoolExecutor创建线程池来执行任务。当有任务从队列中取出时,获取对应优先级的信号量,执行完任务后释放信号量。

关键代码片段

import queue
import concurrent.futures
import threading


# 任务队列
task_queue = queue.PriorityQueue()

# 信号量,分别限制不同优先级任务同时执行的数量
high_priority_semaphore = threading.Semaphore(5)
medium_priority_semaphore = threading.Semaphore(3)
low_priority_semaphore = threading.Semaphore(2)


def execute_task(task):
    priority, task_func = task
    if priority == 'high':
        high_priority_semaphore.acquire()
    elif priority =='medium':
        medium_priority_semaphore.acquire()
    else:
        low_priority_semaphore.acquire()
    try:
        task_func()
    finally:
        if priority == 'high':
            high_priority_semaphore.release()
        elif priority =='medium':
            medium_priority_semaphore.release()
        else:
            low_priority_semaphore.release()


def worker():
    while True:
        try:
            task = task_queue.get(block=True)
            execute_task(task)
            task_queue.task_done()
        except queue.Empty:
            break


# 创建线程池
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
    # 启动线程
    for _ in range(executor._max_workers):
        executor.submit(worker)

    # 模拟添加任务
    def high_priority_task():
        print("执行高优先级任务")


    def medium_priority_task():
        print("执行中优先级任务")


    def low_priority_task():
        print("执行低优先级任务")


    task_queue.put(('high', high_priority_task))
    task_queue.put(('medium', medium_priority_task))
    task_queue.put(('low', low_priority_task))

    # 等待所有任务完成
    task_queue.join()