MST

星途 面试题库

面试题:Rust中使用条件变量实现复杂消费顺序的并发控制

给定一个场景,有多个任务(用结构体表示),每个任务有不同的优先级和依赖关系。在多线程环境下,使用Rust的条件变量(`Condvar`)和互斥锁(`Mutex`)设计一个机制,使得消费者按照任务的优先级和依赖关系依次消费任务。请详细说明设计思路并给出完整的代码实现,包括必要的错误处理。
22.6万 热度难度
编程语言Rust

知识考点

AI 面试

面试题答案

一键面试

设计思路

  1. 任务结构体定义:定义一个结构体来表示任务,包含任务ID、优先级、依赖任务ID列表等字段。
  2. 优先级队列:使用PriorityQueue(来自priority_queue crate)来存储任务,根据任务优先级排序。
  3. 依赖关系处理:通过一个HashMap来存储每个任务的依赖任务完成情况。
  4. 线程同步:利用Mutex来保护共享资源,如任务队列和依赖关系表。Condvar用于线程间的通知,当有新任务可执行时通知等待的消费者线程。

代码实现

use std::collections::{HashMap, VecDeque};
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use priority_queue::PriorityQueue;

// 定义任务结构体
#[derive(Debug)]
struct Task {
    id: u32,
    priority: u32,
    dependencies: Vec<u32>,
}

fn main() {
    let task_queue = Arc::new((Mutex::new(PriorityQueue::new()), Condvar::new()));
    let dep_map = Arc::new(Mutex::new(HashMap::new()));

    // 创建生产者线程
    let producer_task_queue = task_queue.clone();
    let producer_dep_map = dep_map.clone();
    thread::spawn(move || {
        // 模拟添加任务
        let tasks = vec![
            Task { id: 1, priority: 3, dependencies: vec![] },
            Task { id: 2, priority: 2, dependencies: vec![1] },
            Task { id: 3, priority: 1, dependencies: vec![1, 2] },
        ];
        for task in tasks {
            let (lock, cvar) = &*producer_task_queue;
            let mut queue = lock.lock().unwrap();
            queue.push(task.id, task.priority);
            producer_dep_map.lock().unwrap().insert(task.id, task.dependencies);
            cvar.notify_one();
        }
    });

    // 创建消费者线程
    let consumer_task_queue = task_queue.clone();
    let consumer_dep_map = dep_map.clone();
    thread::spawn(move || {
        loop {
            let (lock, cvar) = &*consumer_task_queue;
            let mut queue = lock.lock().unwrap();
            while queue.is_empty() {
                queue = cvar.wait(queue).unwrap();
            }
            let (task_id, _) = queue.pop().unwrap();
            let dep_map = consumer_dep_map.lock().unwrap();
            let dependencies = dep_map.get(&task_id).unwrap();
            let all_done = dependencies.iter().all(|dep| {
                let dep_map = consumer_dep_map.lock().unwrap();
                dep_map.get(dep).map_or(true, |deps| deps.is_empty())
            });
            if all_done {
                println!("Consuming task: {}", task_id);
                // 模拟任务完成,移除依赖
                let mut dep_map = consumer_dep_map.lock().unwrap();
                dep_map.remove(&task_id);
                for (_, deps) in dep_map.iter_mut() {
                    deps.retain(|&id| id != task_id);
                }
            } else {
                queue.push(task_id, 0); // 重新放回队列,等待依赖完成
            }
        }
    });

    // 主线程等待一会儿
    thread::sleep(std::time::Duration::from_secs(5));
}

错误处理说明

  1. MutexCondvar操作:在获取锁和等待条件变量时,使用unwrap处理可能的错误。在实际生产环境中,可以更优雅地处理这些错误,例如使用expect并提供更详细的错误信息,或者使用match语句进行错误处理。
  2. PriorityQueue操作PriorityQueuepop操作返回Option类型,这里通过unwrap处理,假设队列不为空。在实际应用中,需要根据具体情况处理None的情况,比如在队列为空时进行等待。
  3. HashMap操作:在获取任务依赖和移除任务依赖时,使用unwrap处理可能的None情况。同样,在实际生产中应更妥善处理,比如在插入任务时确保依赖任务已经存在,或者在获取依赖时检查是否存在并进行相应处理。