MST

星途 面试题库

面试题:Rust条件变量在复杂并发场景下的优化与应用

考虑一个分布式系统中的任务调度场景,多个节点的线程需要竞争执行任务,任务执行的优先级不同。使用Rust的`Condvar`和相关并发原语设计一个高效的任务调度器。描述你的设计思路,包括如何处理线程间的竞争、任务优先级的管理、以及如何优化性能以减少锁争用。同时给出核心代码片段并详细解释其实现原理。
13.8万 热度难度
编程语言Rust

知识考点

AI 面试

面试题答案

一键面试

设计思路

  1. 线程间竞争处理:使用Mutex来保护共享资源,Condvar用于线程间的通知和等待。当任务队列中没有任务时,工作线程通过Condvar进入等待状态,当有新任务加入时,通过Condvar唤醒等待的线程。
  2. 任务优先级管理:使用一个优先队列(如BinaryHeap)来存储任务,优先队列根据任务的优先级进行排序,高优先级的任务在队列头部。
  3. 减少锁争用优化
    • 减少持有锁的时间,例如在获取任务后尽快释放锁,让其他线程有机会操作任务队列。
    • 使用细粒度锁,不过在这个场景下,由于任务队列是共享资源,整体使用一个锁是较为合理的,但尽量减少锁内的操作。

核心代码片段

use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::collections::BinaryHeap;

// 定义任务结构体
struct Task {
    priority: u32,
    // 可以在这里添加任务的具体内容,例如一个闭包
}

impl Ord for Task {
    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
        other.priority.cmp(&self.priority)
    }
}

impl PartialOrd for Task {
    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
        Some(self.cmp(other))
    }
}

impl Eq for Task {}

impl PartialEq for Task {
    fn eq(&self, other: &Self) -> bool {
        self.priority == other.priority
    }
}

fn main() {
    let task_queue = Arc::new((Mutex::new(BinaryHeap::new()), Condvar::new()));
    let task_queue_clone = task_queue.clone();

    // 创建工作线程
    let worker = thread::spawn(move || {
        loop {
            let (lock, cvar) = &*task_queue_clone;
            let mut queue = lock.lock().unwrap();
            while queue.is_empty() {
                queue = cvar.wait(queue).unwrap();
            }
            let task = queue.pop().unwrap();
            drop(queue); // 尽快释放锁
            // 执行任务
            println!("执行任务,优先级: {}", task.priority);
        }
    });

    // 模拟添加任务
    let (lock, cvar) = &*task_queue;
    let mut queue = lock.lock().unwrap();
    queue.push(Task { priority: 3 });
    queue.push(Task { priority: 1 });
    queue.push(Task { priority: 2 });
    cvar.notify_one();
    drop(queue);

    worker.join().unwrap();
}

代码实现原理

  1. 任务结构体Task结构体包含任务的优先级priority,这里省略了任务具体执行内容,可以根据实际情况添加,例如一个闭包。Task实现了OrdPartialOrdEqPartialEq trait,以便在优先队列中进行排序。
  2. 线程创建:通过thread::spawn创建一个工作线程,该线程从共享的任务队列中获取任务并执行。
  3. 任务队列:使用Arc来共享(Mutex<BinaryHeap<Task>>, Condvar)Mutex保护BinaryHeap类型的任务队列,Condvar用于线程间的通知。
  4. 工作线程逻辑:工作线程通过lock.lock().unwrap()获取锁,进入临界区。如果任务队列为空,通过cvar.wait(queue).unwrap()进入等待状态并释放锁,当被唤醒后重新获取锁。当获取到任务后,尽快释放锁(通过drop(queue)),然后执行任务。
  5. 添加任务逻辑:主线程中获取锁,向任务队列添加任务,然后通过cvar.notify_one()唤醒一个等待的线程。