MST

星途 面试题库

面试题:Rust中条件变量(Condvar)与Mutex协同实现复杂同步逻辑

假设有一个场景,多个生产者线程往一个共享队列中添加数据,多个消费者线程从该队列中取出数据。当队列为空时,消费者线程需要等待;当队列满时,生产者线程需要等待。请使用Rust的Condvar和Mutex来实现这个同步逻辑,并解释代码中每一步的作用及原理。
39.3万 热度难度
编程语言Rust

知识考点

AI 面试

面试题答案

一键面试
use std::sync::{Arc, Condvar, Mutex};
use std::thread;

struct SharedQueue<T> {
    queue: Vec<T>,
    capacity: usize,
}

impl<T> SharedQueue<T> {
    fn new(capacity: usize) -> Self {
        SharedQueue {
            queue: Vec::new(),
            capacity,
        }
    }
}

fn main() {
    let shared_queue = Arc::new((Mutex::new(SharedQueue::new(5)), Condvar::new()));
    let producer_handles: Vec<_> = (0..2).map(|i| {
        let shared_queue = shared_queue.clone();
        thread::spawn(move || {
            let (lock, cvar) = &*shared_queue;
            for j in 0..5 {
                let mut queue = lock.lock().unwrap();
                while queue.queue.len() >= queue.capacity {
                    queue = cvar.wait(queue).unwrap();
                }
                queue.queue.push(j * 10 + i);
                println!("Producer {} added {}", i, j * 10 + i);
                cvar.notify_one();
            }
        })
    }).collect();

    let consumer_handles: Vec<_> = (0..2).map(|i| {
        let shared_queue = shared_queue.clone();
        thread::spawn(move || {
            let (lock, cvar) = &*shared_queue;
            for _ in 0..5 {
                let mut queue = lock.lock().unwrap();
                while queue.queue.is_empty() {
                    queue = cvar.wait(queue).unwrap();
                }
                let item = queue.queue.remove(0);
                println!("Consumer {} removed {}", i, item);
                cvar.notify_one();
            }
        })
    }).collect();

    for handle in producer_handles {
        handle.join().unwrap();
    }
    for handle in consumer_handles {
        handle.join().unwrap();
    }
}

代码解释

  1. 定义共享队列结构体
    • SharedQueue 结构体包含一个 Vec<T> 用于存储数据,以及一个 usize 类型的 capacity 表示队列的最大容量。
    • new 方法用于初始化 SharedQueue,创建一个空的 Vec 并设置容量。
  2. 主线程部分
    • 创建共享资源
      • 使用 Arc 来跨线程共享 (Mutex<SharedQueue<T>>, Condvar) 元组。Arc 允许在多个线程间共享数据,Mutex 用于保护 SharedQueueCondvar 用于线程间的条件变量同步。
    • 生产者线程
      • 循环创建生产者线程,每个生产者线程向队列中添加5个数据。
      • 获取 Mutex 的锁 lock.lock().unwrap(),如果队列已满 queue.queue.len() >= queue.capacity,则调用 cvar.wait(queue).unwrap() 等待条件变量通知,同时释放锁,使得其他线程可以操作队列。
      • 当被唤醒且队列不满时,向队列中添加数据,并通过 cvar.notify_one() 唤醒一个等待的线程(可能是消费者线程)。
    • 消费者线程
      • 循环创建消费者线程,每个消费者线程从队列中取出5个数据。
      • 获取 Mutex 的锁 lock.lock().unwrap(),如果队列为空 queue.queue.is_empty(),则调用 cvar.wait(queue).unwrap() 等待条件变量通知,同时释放锁,使得其他线程可以操作队列。
      • 当被唤醒且队列不为空时,从队列中取出数据,并通过 cvar.notify_one() 唤醒一个等待的线程(可能是生产者线程)。
  3. 等待线程结束
    • 使用 join 方法等待所有生产者和消费者线程完成任务。

原理

  1. Mutex:互斥锁用于保护共享资源 SharedQueue,确保同一时间只有一个线程可以访问队列,防止数据竞争。
  2. Condvar:条件变量用于线程间的同步。当队列满或空时,相应的生产者或消费者线程通过 wait 方法等待,同时释放 Mutex 的锁。当有数据添加或移除队列时,通过 notify_one 方法唤醒一个等待的线程,被唤醒的线程重新获取 Mutex 的锁后继续执行。