use std::sync::{Arc, Condvar, Mutex};
use std::thread;
struct SharedQueue<T> {
queue: Vec<T>,
capacity: usize,
}
impl<T> SharedQueue<T> {
fn new(capacity: usize) -> Self {
SharedQueue {
queue: Vec::new(),
capacity,
}
}
}
fn main() {
let shared_queue = Arc::new((Mutex::new(SharedQueue::new(5)), Condvar::new()));
let producer_handles: Vec<_> = (0..2).map(|i| {
let shared_queue = shared_queue.clone();
thread::spawn(move || {
let (lock, cvar) = &*shared_queue;
for j in 0..5 {
let mut queue = lock.lock().unwrap();
while queue.queue.len() >= queue.capacity {
queue = cvar.wait(queue).unwrap();
}
queue.queue.push(j * 10 + i);
println!("Producer {} added {}", i, j * 10 + i);
cvar.notify_one();
}
})
}).collect();
let consumer_handles: Vec<_> = (0..2).map(|i| {
let shared_queue = shared_queue.clone();
thread::spawn(move || {
let (lock, cvar) = &*shared_queue;
for _ in 0..5 {
let mut queue = lock.lock().unwrap();
while queue.queue.is_empty() {
queue = cvar.wait(queue).unwrap();
}
let item = queue.queue.remove(0);
println!("Consumer {} removed {}", i, item);
cvar.notify_one();
}
})
}).collect();
for handle in producer_handles {
handle.join().unwrap();
}
for handle in consumer_handles {
handle.join().unwrap();
}
}
代码解释
- 定义共享队列结构体:
SharedQueue
结构体包含一个 Vec<T>
用于存储数据,以及一个 usize
类型的 capacity
表示队列的最大容量。
new
方法用于初始化 SharedQueue
,创建一个空的 Vec
并设置容量。
- 主线程部分:
- 创建共享资源:
- 使用
Arc
来跨线程共享 (Mutex<SharedQueue<T>>, Condvar)
元组。Arc
允许在多个线程间共享数据,Mutex
用于保护 SharedQueue
,Condvar
用于线程间的条件变量同步。
- 生产者线程:
- 循环创建生产者线程,每个生产者线程向队列中添加5个数据。
- 获取
Mutex
的锁 lock.lock().unwrap()
,如果队列已满 queue.queue.len() >= queue.capacity
,则调用 cvar.wait(queue).unwrap()
等待条件变量通知,同时释放锁,使得其他线程可以操作队列。
- 当被唤醒且队列不满时,向队列中添加数据,并通过
cvar.notify_one()
唤醒一个等待的线程(可能是消费者线程)。
- 消费者线程:
- 循环创建消费者线程,每个消费者线程从队列中取出5个数据。
- 获取
Mutex
的锁 lock.lock().unwrap()
,如果队列为空 queue.queue.is_empty()
,则调用 cvar.wait(queue).unwrap()
等待条件变量通知,同时释放锁,使得其他线程可以操作队列。
- 当被唤醒且队列不为空时,从队列中取出数据,并通过
cvar.notify_one()
唤醒一个等待的线程(可能是生产者线程)。
- 等待线程结束:
- 使用
join
方法等待所有生产者和消费者线程完成任务。
原理
- Mutex:互斥锁用于保护共享资源
SharedQueue
,确保同一时间只有一个线程可以访问队列,防止数据竞争。
- Condvar:条件变量用于线程间的同步。当队列满或空时,相应的生产者或消费者线程通过
wait
方法等待,同时释放 Mutex
的锁。当有数据添加或移除队列时,通过 notify_one
方法唤醒一个等待的线程,被唤醒的线程重新获取 Mutex
的锁后继续执行。