代码实现
use std::sync::{Arc, Mutex};
use std::thread;
use std::sync::mpsc::{channel, Receiver, Sender};
// 任务结构体
struct Task {
// 假设任务有个简单的标识
id: u32,
}
fn main() {
// 创建通道
let (tx, rx): (Sender<Task>, Receiver<Task>) = channel();
// 使用Arc和Mutex来包装共享队列
let shared_queue = Arc::new(Mutex::new(vec![]));
// 启动工作线程
let mut handles = vec![];
for _ in 0..4 {
let shared_queue_clone = Arc::clone(&shared_queue);
let rx_clone = rx.clone();
let handle = thread::spawn(move || {
loop {
let task = match rx_clone.recv() {
Ok(task) => task,
Err(_) => break,
};
// 模拟任务处理
println!("Thread {:?} is processing task {:?}", thread::current().id(), task.id);
// 任务处理完后从共享队列移除
let mut queue = shared_queue_clone.lock().unwrap();
queue.retain(|t| t.id != task.id);
}
});
handles.push(handle);
}
// 主线程添加任务
for i in 0..10 {
let task = Task { id: i };
// 将任务添加到共享队列
let mut queue = shared_queue.lock().unwrap();
queue.push(task.clone());
// 通过通道通知工作线程有新任务
tx.send(task).unwrap();
}
// 关闭通道,通知工作线程结束
drop(tx);
// 等待所有工作线程结束
for handle in handles {
handle.join().unwrap();
}
}
竞争条件及避免方法
- 竞争条件:
- 数据竞争:多个工作线程同时访问和修改共享队列时,可能会导致数据不一致。例如,一个线程正在读取队列中的任务,另一个线程同时在修改队列结构(添加或删除任务)。
- 死锁:如果多个线程在获取锁的顺序上不一致,可能会导致死锁。比如线程A获取了锁1,等待锁2,而线程B获取了锁2,等待锁1。
- 避免方法:
- 使用Mutex:在访问共享队列时,通过
Mutex
来锁定队列,确保同一时间只有一个线程可以访问和修改队列,从而避免数据竞争。如上述代码中,通过shared_queue.lock().unwrap()
获取锁来访问和修改共享队列。
- 合理的锁顺序:在涉及多个锁的情况下,确保所有线程以相同的顺序获取锁,以避免死锁。在本场景中,由于只有一个共享队列(一个锁),不存在这种死锁风险,但在更复杂场景中需要注意。
- 使用通道:通过通道来传递任务通知,减少直接对共享队列的频繁访问,降低竞争风险。工作线程通过通道接收任务,主线程通过通道发送任务,这样可以避免多个线程同时操作共享队列的频率。