设计思路
- 任务结构体定义:定义一个结构体来表示任务,包含任务ID、优先级、依赖任务ID列表等字段。
- 优先级队列:使用
PriorityQueue
(来自priority_queue
crate)来存储任务,根据任务优先级排序。
- 依赖关系处理:通过一个
HashMap
来存储每个任务的依赖任务完成情况。
- 线程同步:利用
Mutex
来保护共享资源,如任务队列和依赖关系表。Condvar
用于线程间的通知,当有新任务可执行时通知等待的消费者线程。
代码实现
use std::collections::{HashMap, VecDeque};
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use priority_queue::PriorityQueue;
// 定义任务结构体
#[derive(Debug)]
struct Task {
id: u32,
priority: u32,
dependencies: Vec<u32>,
}
fn main() {
let task_queue = Arc::new((Mutex::new(PriorityQueue::new()), Condvar::new()));
let dep_map = Arc::new(Mutex::new(HashMap::new()));
// 创建生产者线程
let producer_task_queue = task_queue.clone();
let producer_dep_map = dep_map.clone();
thread::spawn(move || {
// 模拟添加任务
let tasks = vec![
Task { id: 1, priority: 3, dependencies: vec![] },
Task { id: 2, priority: 2, dependencies: vec![1] },
Task { id: 3, priority: 1, dependencies: vec![1, 2] },
];
for task in tasks {
let (lock, cvar) = &*producer_task_queue;
let mut queue = lock.lock().unwrap();
queue.push(task.id, task.priority);
producer_dep_map.lock().unwrap().insert(task.id, task.dependencies);
cvar.notify_one();
}
});
// 创建消费者线程
let consumer_task_queue = task_queue.clone();
let consumer_dep_map = dep_map.clone();
thread::spawn(move || {
loop {
let (lock, cvar) = &*consumer_task_queue;
let mut queue = lock.lock().unwrap();
while queue.is_empty() {
queue = cvar.wait(queue).unwrap();
}
let (task_id, _) = queue.pop().unwrap();
let dep_map = consumer_dep_map.lock().unwrap();
let dependencies = dep_map.get(&task_id).unwrap();
let all_done = dependencies.iter().all(|dep| {
let dep_map = consumer_dep_map.lock().unwrap();
dep_map.get(dep).map_or(true, |deps| deps.is_empty())
});
if all_done {
println!("Consuming task: {}", task_id);
// 模拟任务完成,移除依赖
let mut dep_map = consumer_dep_map.lock().unwrap();
dep_map.remove(&task_id);
for (_, deps) in dep_map.iter_mut() {
deps.retain(|&id| id != task_id);
}
} else {
queue.push(task_id, 0); // 重新放回队列,等待依赖完成
}
}
});
// 主线程等待一会儿
thread::sleep(std::time::Duration::from_secs(5));
}
错误处理说明
Mutex
和Condvar
操作:在获取锁和等待条件变量时,使用unwrap
处理可能的错误。在实际生产环境中,可以更优雅地处理这些错误,例如使用expect
并提供更详细的错误信息,或者使用match
语句进行错误处理。
PriorityQueue
操作:PriorityQueue
的pop
操作返回Option
类型,这里通过unwrap
处理,假设队列不为空。在实际应用中,需要根据具体情况处理None
的情况,比如在队列为空时进行等待。
HashMap
操作:在获取任务依赖和移除任务依赖时,使用unwrap
处理可能的None
情况。同样,在实际生产中应更妥善处理,比如在插入任务时确保依赖任务已经存在,或者在获取依赖时检查是否存在并进行相应处理。