- 思路:
- 使用
std::sync::mpsc
进行消息传递,mpsc
(Multiple Producer, Single Consumer)通道可以方便地在多线程间传递数据。
- 对于数据处理顺序要求,我们可以使用
Mutex
来保护共享状态,用Condvar
来进行条件等待。例如,我们可以维护一个数据处理状态的映射,消费者线程在处理数据前检查相关依赖数据是否已处理完成,如果未完成则等待。
- 关键代码片段及解释:
use std::sync::{Arc, Mutex, Condvar};
use std::thread;
use std::sync::mpsc::{channel, Sender};
// 定义数据类型
struct Data {
id: u32,
// 假设这里有其他数据字段
}
// 假设我们有一个数据处理依赖关系的结构体
struct Dependency {
data_id: u32,
depends_on: Vec<u32>,
}
fn main() {
let (tx, rx) = channel();
let shared_state = Arc::new((Mutex::new(Vec::new()), Condvar::new()));
// 生产者线程
let producer_tx = tx.clone();
thread::spawn(move || {
let data1 = Data { id: 1 };
let data2 = Data { id: 2 };
producer_tx.send(data1).unwrap();
producer_tx.send(data2).unwrap();
});
// 消费者线程
let consumer_shared_state = shared_state.clone();
thread::spawn(move || {
let (lock, cvar) = &*consumer_shared_state;
let mut processed_ids = lock.lock().unwrap();
loop {
match rx.recv() {
Ok(data) => {
// 假设数据2依赖数据1处理完成
let dependency = Dependency { data_id: data.id, depends_on: if data.id == 2 { vec![1] } else { vec![] } };
let should_wait = dependency.depends_on.iter().any(|dep_id|!processed_ids.contains(dep_id));
if should_wait {
processed_ids = cvar.wait(processed_ids).unwrap();
}
// 处理数据
println!("Processing data with id: {}", data.id);
processed_ids.push(data.id);
},
Err(_) => {
break;
}
}
}
});
}
- 通道部分:
let (tx, rx) = channel();
创建了一个mpsc
通道,tx
用于发送数据(生产者使用),rx
用于接收数据(消费者使用)。
- 共享状态部分:
let shared_state = Arc::new((Mutex::new(Vec::new()), Condvar::new()));
创建了一个共享状态,其中包含一个Mutex
保护的Vec<u32>
(用于记录已处理的数据ID)和一个Condvar
。Arc
用于在多线程间共享这个状态。
- 生产者线程:
let producer_tx = tx.clone();
克隆发送端以便在新线程中使用。
producer_tx.send(data1).unwrap();
和 producer_tx.send(data2).unwrap();
发送数据到通道。
- 消费者线程:
let (lock, cvar) = &*consumer_shared_state;
从共享状态中获取Mutex
和Condvar
。
let mut processed_ids = lock.lock().unwrap();
获取锁,从而可以操作共享的已处理数据ID列表。
- 在接收到数据后,检查数据的依赖关系,如果依赖数据未处理,则使用
cvar.wait(processed_ids).unwrap();
等待,直到依赖数据被处理(Condvar
会自动释放锁并在被唤醒时重新获取锁)。
- 处理完数据后,将数据ID加入已处理列表。