整体设计思路
- 任务队列:使用Python的
queue.PriorityQueue
来存储任务,任务按照优先级排序,优先级高的任务优先执行。
- 信号量:为不同优先级的任务创建对应的信号量,高优先级任务信号量初始值为5,中优先级为3,低优先级为2,用于限制同一时间执行的任务数量。
- 线程池:使用
concurrent.futures.ThreadPoolExecutor
创建线程池来执行任务。当有任务从队列中取出时,获取对应优先级的信号量,执行完任务后释放信号量。
关键代码片段
import queue
import concurrent.futures
import threading
# 任务队列
task_queue = queue.PriorityQueue()
# 信号量,分别限制不同优先级任务同时执行的数量
high_priority_semaphore = threading.Semaphore(5)
medium_priority_semaphore = threading.Semaphore(3)
low_priority_semaphore = threading.Semaphore(2)
def execute_task(task):
priority, task_func = task
if priority == 'high':
high_priority_semaphore.acquire()
elif priority =='medium':
medium_priority_semaphore.acquire()
else:
low_priority_semaphore.acquire()
try:
task_func()
finally:
if priority == 'high':
high_priority_semaphore.release()
elif priority =='medium':
medium_priority_semaphore.release()
else:
low_priority_semaphore.release()
def worker():
while True:
try:
task = task_queue.get(block=True)
execute_task(task)
task_queue.task_done()
except queue.Empty:
break
# 创建线程池
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
# 启动线程
for _ in range(executor._max_workers):
executor.submit(worker)
# 模拟添加任务
def high_priority_task():
print("执行高优先级任务")
def medium_priority_task():
print("执行中优先级任务")
def low_priority_task():
print("执行低优先级任务")
task_queue.put(('high', high_priority_task))
task_queue.put(('medium', medium_priority_task))
task_queue.put(('low', low_priority_task))
# 等待所有任务完成
task_queue.join()