MST

星途 面试题库

面试题:Rust非作用域互斥体与其他并发原语的协同使用及场景分析

在Rust的并发编程中,非作用域互斥体常常需要与其他并发原语(如条件变量`Condvar`、信号量`Semaphore`等)协同工作。请描述一个实际场景,说明如何合理地结合这些原语来实现高效、安全的并发控制,并给出相应的代码实现及详细的设计思路。
15.5万 热度难度
编程语言Rust

知识考点

AI 面试

面试题答案

一键面试

实际场景

假设我们有一个生产者 - 消费者模型,生产者不断生产数据并放入共享队列,消费者从共享队列中取出数据进行处理。这里需要确保对共享队列的访问是线程安全的,并且在队列为空时消费者能等待,队列满时生产者能等待。

设计思路

  1. 使用互斥体(Mutex:用于保护共享队列,确保同一时间只有一个线程能访问队列。
  2. 使用条件变量(Condvar:当队列为空时,消费者线程等待条件变量,生产者向队列添加数据后通知条件变量,唤醒消费者线程。同理,当队列满时,生产者等待条件变量,消费者从队列取出数据后通知条件变量,唤醒生产者线程。
  3. 使用信号量(Semaphore:可以用来限制队列中元素的数量,达到队列满的效果。

代码实现

use std::sync::{Arc, Condvar, Mutex, Semaphore};
use std::thread;
use std::time::Duration;

struct SharedQueue<T> {
    queue: Vec<T>,
    capacity: usize,
}

impl<T> SharedQueue<T> {
    fn new(capacity: usize) -> Self {
        SharedQueue {
            queue: Vec::with_capacity(capacity),
            capacity,
        }
    }

    fn push(&mut self, item: T) {
        self.queue.push(item);
    }

    fn pop(&mut self) -> Option<T> {
        self.queue.pop()
    }
}

fn main() {
    let shared_queue = Arc::new((Mutex::new(SharedQueue::new(5)), Condvar::new(), Semaphore::new(5)));
    let producer_shared_queue = shared_queue.clone();
    let consumer_shared_queue = shared_queue.clone();

    let producer = thread::spawn(move || {
        for i in 0..10 {
            producer_shared_queue.2.acquire().unwrap();
            let (lock, condvar, _) = &*producer_shared_queue;
            let mut queue = lock.lock().unwrap();
            while queue.queue.len() == queue.capacity {
                queue = condvar.wait(queue).unwrap();
            }
            queue.push(i);
            println!("Producer pushed: {}", i);
            condvar.notify_one();
        }
    });

    let consumer = thread::spawn(move || {
        for _ in 0..10 {
            let (lock, condvar, semaphore) = &*consumer_shared_queue;
            let mut queue = lock.lock().unwrap();
            while queue.queue.is_empty() {
                queue = condvar.wait(queue).unwrap();
            }
            if let Some(item) = queue.pop() {
                println!("Consumer popped: {}", item);
                semaphore.release();
                condvar.notify_one();
            }
        }
    });

    producer.join().unwrap();
    consumer.join().unwrap();
}

代码解释

  1. SharedQueue 结构体:定义了一个共享队列,包含队列数据和队列容量。
  2. pushpop 方法:用于向队列添加和移除元素。
  3. 主函数
    • 创建一个 Arc 包裹的元组,包含 MutexCondvarSemaphoreMutex 保护共享队列,Condvar 用于线程间的条件等待和通知,Semaphore 用于限制队列元素数量。
    • 生产者线程:通过 Semaphore 控制队列容量,获取锁后检查队列是否已满,如果满则等待 Condvar 通知。添加元素后通知 Condvar,唤醒等待的线程。
    • 消费者线程:获取锁后检查队列是否为空,如果空则等待 Condvar 通知。取出元素后释放 Semaphore,并通知 Condvar,唤醒等待的线程。
  4. 线程的 join 操作:等待生产者和消费者线程完成。