use std::sync::{Arc, Mutex, Condvar};
use std::thread;
use std::collections::VecDeque;
// 定义生产者 - 消费者模型的结构体
struct ProducerConsumer {
buffer: VecDeque<i32>,
capacity: usize,
cond_producer: Condvar,
cond_consumer: Condvar,
mutex: Mutex<()>,
}
impl ProducerConsumer {
fn new(capacity: usize) -> Self {
ProducerConsumer {
buffer: VecDeque::new(),
capacity,
cond_producer: Condvar::new(),
cond_consumer: Condvar::new(),
mutex: Mutex::new(()),
}
}
// 生产者方法
fn produce(&self, item: i32) {
let _lock = self.mutex.lock().unwrap();
while self.buffer.len() >= self.capacity {
let _lock = self.cond_producer.wait(_lock).unwrap();
}
self.buffer.push_back(item);
self.cond_consumer.notify_one();
}
// 消费者方法,根据条件唤醒消费
fn consume(&self, condition: fn(i32) -> bool) -> Option<i32> {
let _lock = self.mutex.lock().unwrap();
while self.buffer.is_empty() {
let _lock = self.cond_consumer.wait(_lock).unwrap();
}
let item = self.buffer.pop_front();
if let Some(item) = item {
if condition(item) {
self.cond_producer.notify_one();
Some(item)
} else {
self.buffer.push_front(item);
None
}
} else {
None
}
}
}
fn main() {
let pc = Arc::new(ProducerConsumer::new(10));
let pc_producer = pc.clone();
let producer_handle = thread::spawn(move || {
for i in 0..20 {
pc_producer.produce(i);
println!("Produced: {}", i);
}
});
let pc_consumer = pc.clone();
let consumer_handle = thread::spawn(move || {
let condition = |x| x % 2 == 0;
while let Some(item) = pc_consumer.consume(condition) {
println!("Consumed: {}", item);
}
});
producer_handle.join().unwrap();
consumer_handle.join().unwrap();
}
处理缓存溢出
- 容量限制:在
ProducerConsumer
结构体中定义了capacity
字段,表示缓存的最大容量。在produce
方法中,使用while self.buffer.len() >= self.capacity
来检查缓存是否已满。如果已满,生产者线程会通过self.cond_producer.wait(_lock)
进入等待状态,直到消费者消费了数据,通过self.cond_producer.notify_one()
唤醒生产者。
- 唤醒机制:消费者消费数据后,会调用
self.cond_producer.notify_one()
唤醒一个等待的生产者线程,这样生产者就可以继续生产数据,避免缓存一直处于满的状态。
处理线程安全
- 互斥锁:使用
Mutex
来保护共享资源buffer
。在produce
和consume
方法中,首先通过self.mutex.lock().unwrap()
获取锁,确保在同一时间只有一个线程可以访问和修改buffer
。
- 条件变量:使用
Condvar
(cond_producer
和cond_consumer
)来实现线程间的同步。生产者在缓存满时等待cond_producer
条件变量,消费者在缓存空时等待cond_consumer
条件变量。当条件满足时,通过notify_one
方法唤醒相应等待的线程。
- 所有权和引用:在多线程环境下,使用
Arc
(原子引用计数)来共享ProducerConsumer
实例,确保每个线程都可以安全地访问和操作共享资源,同时避免了所有权转移带来的问题。