可能原因分析
- 任务分配策略问题:简单的任务队列分配方式可能导致某些线程先拿到任务,而其他线程等待。例如,先进先出(FIFO)策略在任务大小不均时,可能使大任务长时间占用线程,造成其他线程饥饿。
- 线程竞争资源:如果线程在执行任务过程中需要竞争共享资源(如数据库连接、文件锁等),可能会导致某些线程因资源竞争失败而长时间等待,无法执行任务。
- 动态线程调整机制不合理:在动态调整线程数时,如果没有考虑当前任务队列的状态和线程的负载情况,可能会出现新创建的线程没有任务可做,而繁忙的线程得不到额外的任务分担。
优化方案
- 改进任务分配策略
- 使用加权轮询调度:根据线程的负载能力为每个线程分配不同的权重,按照权重轮流分配任务。这样负载能力强的线程能得到更多任务。
- 关键代码示例:
import threading
import queue
class WeightedRoundRobin:
def __init__(self, weights):
self.weights = weights
self.current_index = 0
self.total_weight = sum(weights)
def get_next_thread(self):
for _ in range(self.total_weight):
index = self.current_index % len(self.weights)
if self.weights[index] > 0:
self.current_index = (index + 1) % len(self.weights)
return index
self.current_index = (index + 1) % len(self.weights)
# 示例使用
weights = [3, 2, 1] # 假设三个线程的权重
wrr = WeightedRoundRobin(weights)
task_queue = queue.Queue()
# 模拟任务添加
for i in range(10):
task_queue.put(f"Task {i}")
threads = []
for i in range(len(weights)):
def worker():
while True:
if not task_queue.empty():
task = task_queue.get()
print(f"Thread {i} is working on {task}")
task_queue.task_done()
t = threading.Thread(target=worker)
threads.append(t)
t.start()
while not task_queue.empty():
index = wrr.get_next_thread()
if not task_queue.empty():
task = task_queue.get()
print(f"Assigning {task} to Thread {index}")
task_queue.task_done()
for t in threads:
t.join()
- 资源管理优化
- 使用资源池:为共享资源创建资源池,线程从资源池获取资源,避免直接竞争。例如,使用连接池管理数据库连接。
- 关键代码示例(以数据库连接池为例,使用
DBUtils
库):
from dbutils.pooled_db import PooledDB
import threading
import queue
import pymysql
# 创建数据库连接池
pool = PooledDB(pymysql, 5, host='localhost', user='user', passwd='password', db='test', port=3306)
task_queue = queue.Queue()
# 模拟任务添加
for i in range(10):
task_queue.put(f"Database Task {i}")
def database_worker():
while True:
if not task_queue.empty():
task = task_queue.get()
conn = pool.connection()
try:
cursor = conn.cursor()
# 执行数据库操作示例
cursor.execute("SELECT VERSION()")
data = cursor.fetchone()
print(f"Thread {threading.current_thread().name} is working on {task}, Database version: {data}")
finally:
conn.close()
task_queue.task_done()
threads = []
for i in range(3):
t = threading.Thread(target=database_worker)
threads.append(t)
t.start()
task_queue.join()
for t in threads:
t.join()
- 动态线程调整优化
- 基于任务队列长度和线程负载动态调整:监控任务队列的长度和每个线程的负载情况(例如,通过记录线程执行任务的时间或当前正在执行的任务数量),当任务队列长度超过一定阈值且部分线程空闲时,动态增加线程;当任务队列长度较短且线程数量过多时,动态减少线程。
- 关键代码示例:
import threading
import queue
import time
task_queue = queue.Queue()
active_threads = []
lock = threading.Lock()
def monitor_and_adjust():
while True:
with lock:
queue_length = task_queue.qsize()
num_active_threads = len(active_threads)
if queue_length > 10 and num_active_threads < 5:
new_thread = threading.Thread(target=worker)
new_thread.start()
active_threads.append(new_thread)
elif queue_length < 5 and num_active_threads > 2:
if active_threads:
thread_to_remove = active_threads.pop()
thread_to_remove.join()
time.sleep(5)
def worker():
while True:
if not task_queue.empty():
task = task_queue.get()
print(f"Thread {threading.current_thread().name} is working on {task}")
time.sleep(1) # 模拟任务执行
task_queue.task_done()
monitor_thread = threading.Thread(target=monitor_and_adjust)
monitor_thread.start()
# 模拟任务添加
for i in range(20):
task_queue.put(f"Task {i}")
task_queue.join()
monitor_thread.join()