use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::time::Duration;
// 定义共享队列的最大容量
const QUEUE_CAPACITY: usize = 5;
fn main() {
let queue = Arc::new((Mutex::new(Vec::with_capacity(QUEUE_CAPACITY)), Condvar::new()));
let producer_queue = queue.clone();
let consumer_queue = queue.clone();
// 创建生产者线程
let producer = thread::spawn(move || {
let (lock, cvar) = &*producer_queue;
let mut data = 0;
loop {
let mut queue = lock.lock().unwrap();
while queue.len() == QUEUE_CAPACITY {
queue = cvar.wait(queue).unwrap();
}
queue.push(data);
println!("Produced: {}", data);
data += 1;
cvar.notify_one();
}
});
// 创建消费者线程
let consumer = thread::spawn(move || {
let (lock, cvar) = &*consumer_queue;
loop {
let mut queue = lock.lock().unwrap();
while queue.is_empty() {
queue = cvar.wait(queue).unwrap();
}
let item = queue.remove(0);
println!("Consumed: {}", item);
cvar.notify_one();
// 模拟处理数据的时间
thread::sleep(Duration::from_millis(100));
}
});
// 等待线程结束(实际上这里的线程不会结束,只是为了防止主线程提前退出)
producer.join().unwrap();
consumer.join().unwrap();
}
关键步骤解释:
- 定义共享队列和条件变量:
- 使用
Arc<(Mutex<Vec<T>>, Condvar)>
来创建一个线程安全的共享队列和条件变量。Arc
用于在多个线程间共享所有权,Mutex
用于保护共享队列,Condvar
用于线程间的同步。
QUEUE_CAPACITY
定义了共享队列的最大容量。
- 生产者线程:
- 获取锁并进入临界区,使用
lock.lock().unwrap()
。
- 当队列满时(
queue.len() == QUEUE_CAPACITY
),调用 cvar.wait(queue).unwrap()
释放锁并等待条件变量通知。这一步保证了生产者线程不会一直占用锁,避免死锁和线程饥饿。
- 生成数据并放入队列,然后使用
cvar.notify_one()
通知等待的消费者线程。
- 消费者线程:
- 同样获取锁并进入临界区。
- 当队列空时(
queue.is_empty()
),调用 cvar.wait(queue).unwrap()
释放锁并等待条件变量通知。
- 从队列中取出数据并处理,然后使用
cvar.notify_one()
通知等待的生产者线程。
- 线程创建与等待:
- 使用
thread::spawn
创建生产者和消费者线程。
- 使用
join
方法等待线程结束(在实际应用中,这些线程可能是长期运行的,这里只是为了防止主线程提前退出)。