use std::sync::mpsc::{channel, Receiver, Sender};
use std::thread;
use rand::Rng;
fn main() {
let (tx, rx): (Sender<i32>, Receiver<i32>) = channel();
// 生产者线程
let num_producers = 3;
let producer_handles: Vec<_> = (0..num_producers).map(|_| {
let tx = tx.clone();
thread::spawn(move || {
let mut rng = rand::thread_rng();
loop {
let num = rng.gen_range(1..100);
match tx.send(num) {
Ok(_) => (),
Err(_) => break,
}
}
})
}).collect();
// 消费者线程
let num_consumers = 2;
let consumer_handles: Vec<_> = (0..num_consumers).map(|_| {
let rx = rx.clone();
thread::spawn(move || {
for num in rx {
let square = num * num;
println!("Consumer: Squared value of {} is {}", num, square);
}
})
}).collect();
// 等待生产者线程完成
for handle in producer_handles {
handle.join().unwrap();
}
// 关闭通道,通知消费者线程结束
drop(tx);
// 等待消费者线程完成
for handle in consumer_handles {
handle.join().unwrap();
}
}
线程安全和高效通信的保证
- 线程安全:
- 使用
std::sync::mpsc::channel
创建的通道本身就是线程安全的。Sender
和Receiver
类型确保只有拥有Sender
的线程可以发送数据,只有拥有Receiver
的线程可以接收数据,避免了数据竞争。
- 每个生产者线程和消费者线程都有自己独立的状态,生产者线程通过克隆的
Sender
发送数据,消费者线程通过克隆的Receiver
接收数据,这进一步保证了线程安全。
- 高效通信:
- 通道的设计允许线程之间直接传递数据,不需要额外的锁机制来保护共享数据,减少了同步开销。
- 生产者线程持续生成数据并通过通道发送,消费者线程通过
for num in rx
循环从通道接收数据,这种设计保证了数据的高效流动。当通道缓冲区满时,生产者线程会阻塞,直到有消费者从通道中取走数据,从而避免了过度生产。同样,当通道缓冲区为空时,消费者线程会阻塞,直到有生产者发送数据。
- 通过
drop(tx)
关闭发送端通道,通知消费者线程没有更多数据,消费者线程的for
循环会自然结束,这种方式高效地处理了通道关闭和线程的正常退出。