MST
星途 面试题库

面试题:Rust条件变量在复杂生产者 - 消费者模型中的应用

假设有一个复杂的生产者 - 消费者模型,生产者和消费者线程数量不固定,且消费者需要根据不同的条件进行唤醒消费。请用Rust编写代码实现这个模型,要求合理使用条件变量来保证线程间的正确同步,同时说明在实现过程中如何处理缓存溢出、线程安全等问题。
49.9万 热度难度
编程语言Rust

知识考点

AI 面试

面试题答案

一键面试
use std::sync::{Arc, Mutex, Condvar};
use std::thread;
use std::collections::VecDeque;

// 定义生产者 - 消费者模型的结构体
struct ProducerConsumer {
    buffer: VecDeque<i32>,
    capacity: usize,
    cond_producer: Condvar,
    cond_consumer: Condvar,
    mutex: Mutex<()>,
}

impl ProducerConsumer {
    fn new(capacity: usize) -> Self {
        ProducerConsumer {
            buffer: VecDeque::new(),
            capacity,
            cond_producer: Condvar::new(),
            cond_consumer: Condvar::new(),
            mutex: Mutex::new(()),
        }
    }

    // 生产者方法
    fn produce(&self, item: i32) {
        let _lock = self.mutex.lock().unwrap();
        while self.buffer.len() >= self.capacity {
            let _lock = self.cond_producer.wait(_lock).unwrap();
        }
        self.buffer.push_back(item);
        self.cond_consumer.notify_one();
    }

    // 消费者方法,根据条件唤醒消费
    fn consume(&self, condition: fn(i32) -> bool) -> Option<i32> {
        let _lock = self.mutex.lock().unwrap();
        while self.buffer.is_empty() {
            let _lock = self.cond_consumer.wait(_lock).unwrap();
        }
        let item = self.buffer.pop_front();
        if let Some(item) = item {
            if condition(item) {
                self.cond_producer.notify_one();
                Some(item)
            } else {
                self.buffer.push_front(item);
                None
            }
        } else {
            None
        }
    }
}

fn main() {
    let pc = Arc::new(ProducerConsumer::new(10));
    let pc_producer = pc.clone();
    let producer_handle = thread::spawn(move || {
        for i in 0..20 {
            pc_producer.produce(i);
            println!("Produced: {}", i);
        }
    });

    let pc_consumer = pc.clone();
    let consumer_handle = thread::spawn(move || {
        let condition = |x| x % 2 == 0;
        while let Some(item) = pc_consumer.consume(condition) {
            println!("Consumed: {}", item);
        }
    });

    producer_handle.join().unwrap();
    consumer_handle.join().unwrap();
}

处理缓存溢出

  1. 容量限制:在ProducerConsumer结构体中定义了capacity字段,表示缓存的最大容量。在produce方法中,使用while self.buffer.len() >= self.capacity来检查缓存是否已满。如果已满,生产者线程会通过self.cond_producer.wait(_lock)进入等待状态,直到消费者消费了数据,通过self.cond_producer.notify_one()唤醒生产者。
  2. 唤醒机制:消费者消费数据后,会调用self.cond_producer.notify_one()唤醒一个等待的生产者线程,这样生产者就可以继续生产数据,避免缓存一直处于满的状态。

处理线程安全

  1. 互斥锁:使用Mutex来保护共享资源buffer。在produceconsume方法中,首先通过self.mutex.lock().unwrap()获取锁,确保在同一时间只有一个线程可以访问和修改buffer
  2. 条件变量:使用Condvarcond_producercond_consumer)来实现线程间的同步。生产者在缓存满时等待cond_producer条件变量,消费者在缓存空时等待cond_consumer条件变量。当条件满足时,通过notify_one方法唤醒相应等待的线程。
  3. 所有权和引用:在多线程环境下,使用Arc(原子引用计数)来共享ProducerConsumer实例,确保每个线程都可以安全地访问和操作共享资源,同时避免了所有权转移带来的问题。