设计思路
- 线程间竞争处理:使用
Mutex
来保护共享资源,Condvar
用于线程间的通知和等待。当任务队列中没有任务时,工作线程通过Condvar
进入等待状态,当有新任务加入时,通过Condvar
唤醒等待的线程。
- 任务优先级管理:使用一个优先队列(如
BinaryHeap
)来存储任务,优先队列根据任务的优先级进行排序,高优先级的任务在队列头部。
- 减少锁争用优化:
- 减少持有锁的时间,例如在获取任务后尽快释放锁,让其他线程有机会操作任务队列。
- 使用细粒度锁,不过在这个场景下,由于任务队列是共享资源,整体使用一个锁是较为合理的,但尽量减少锁内的操作。
核心代码片段
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::collections::BinaryHeap;
// 定义任务结构体
struct Task {
priority: u32,
// 可以在这里添加任务的具体内容,例如一个闭包
}
impl Ord for Task {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
other.priority.cmp(&self.priority)
}
}
impl PartialOrd for Task {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Eq for Task {}
impl PartialEq for Task {
fn eq(&self, other: &Self) -> bool {
self.priority == other.priority
}
}
fn main() {
let task_queue = Arc::new((Mutex::new(BinaryHeap::new()), Condvar::new()));
let task_queue_clone = task_queue.clone();
// 创建工作线程
let worker = thread::spawn(move || {
loop {
let (lock, cvar) = &*task_queue_clone;
let mut queue = lock.lock().unwrap();
while queue.is_empty() {
queue = cvar.wait(queue).unwrap();
}
let task = queue.pop().unwrap();
drop(queue); // 尽快释放锁
// 执行任务
println!("执行任务,优先级: {}", task.priority);
}
});
// 模拟添加任务
let (lock, cvar) = &*task_queue;
let mut queue = lock.lock().unwrap();
queue.push(Task { priority: 3 });
queue.push(Task { priority: 1 });
queue.push(Task { priority: 2 });
cvar.notify_one();
drop(queue);
worker.join().unwrap();
}
代码实现原理
- 任务结构体:
Task
结构体包含任务的优先级priority
,这里省略了任务具体执行内容,可以根据实际情况添加,例如一个闭包。Task
实现了Ord
、PartialOrd
、Eq
和PartialEq
trait,以便在优先队列中进行排序。
- 线程创建:通过
thread::spawn
创建一个工作线程,该线程从共享的任务队列中获取任务并执行。
- 任务队列:使用
Arc
来共享(Mutex<BinaryHeap<Task>>, Condvar)
,Mutex
保护BinaryHeap
类型的任务队列,Condvar
用于线程间的通知。
- 工作线程逻辑:工作线程通过
lock.lock().unwrap()
获取锁,进入临界区。如果任务队列为空,通过cvar.wait(queue).unwrap()
进入等待状态并释放锁,当被唤醒后重新获取锁。当获取到任务后,尽快释放锁(通过drop(queue)
),然后执行任务。
- 添加任务逻辑:主线程中获取锁,向任务队列添加任务,然后通过
cvar.notify_one()
唤醒一个等待的线程。