use std::sync::{Arc, Condvar, Mutex};
use std::thread;
// 定义缓冲区结构
struct Buffer {
data: Vec<i32>,
head: usize,
tail: usize,
count: usize,
}
impl Buffer {
// 初始化缓冲区
fn new(capacity: usize) -> Buffer {
Buffer {
data: vec![0; capacity],
head: 0,
tail: 0,
count: 0,
}
}
// 向缓冲区写入数据
fn write(&mut self, value: i32) {
self.data[self.tail] = value;
self.tail = (self.tail + 1) % self.data.len();
self.count += 1;
}
// 从缓冲区读取数据
fn read(&mut self) -> i32 {
let value = self.data[self.head];
self.head = (self.head + 1) % self.data.len();
self.count -= 1;
value
}
}
fn main() {
let buffer = Arc::new((Mutex::new(Buffer::new(5)), Condvar::new()));
let producer_buffer = buffer.clone();
let consumer_buffer = buffer.clone();
// 创建生产者线程
let producer = thread::spawn(move || {
for i in 0..10 {
let (lock, cvar) = &*producer_buffer;
let mut buffer = lock.lock().unwrap();
// 等待缓冲区有空间
while buffer.count == buffer.data.len() {
buffer = cvar.wait(buffer).unwrap();
}
println!("Producer produced: {}", i);
buffer.write(i);
// 通知消费者缓冲区有数据
cvar.notify_one();
}
});
// 创建消费者线程
let consumer = thread::spawn(move || {
for _ in 0..10 {
let (lock, cvar) = &*consumer_buffer;
let mut buffer = lock.lock().unwrap();
// 等待缓冲区有数据
while buffer.count == 0 {
buffer = cvar.wait(buffer).unwrap();
}
let value = buffer.read();
println!("Consumer consumed: {}", value);
// 通知生产者缓冲区有空间
cvar.notify_one();
}
});
// 等待生产者和消费者线程结束
producer.join().unwrap();
consumer.join().unwrap();
}
关键部分注释:
Buffer
结构体:定义了一个固定大小的缓冲区,包含数据存储的Vec<i32>
,以及用于跟踪读写位置的head
、tail
和当前数据数量的count
。
write
和read
方法:实现了向缓冲区写入数据和从缓冲区读取数据的逻辑,并更新head
、tail
和count
。
- 生产者线程:
- 使用
Mutex
获取缓冲区锁。
- 当缓冲区满时,通过
Condvar
的wait
方法等待,直到被通知有空间。
- 生产数据并写入缓冲区,然后通过
Condvar
的notify_one
方法通知消费者。
- 消费者线程:
- 使用
Mutex
获取缓冲区锁。
- 当缓冲区空时,通过
Condvar
的wait
方法等待,直到被通知有数据。
- 从缓冲区读取数据并处理,然后通过
Condvar
的notify_one
方法通知生产者。
main
函数:创建了共享的缓冲区,并分别创建生产者和消费者线程,最后等待两个线程结束。