use std::sync::Arc;
use std::thread;
use std::collections::VecDeque;
use std::sync::Mutex;
use std::sync::mpsc::channel;
use std::sync::mpsc::Sender;
use std::sync::mpsc::Receiver;
use num::Float;
use num::FloatConst;
struct BigData {
chunks: Vec<Vec<u32>>
}
fn complex_calculation(num: u32) -> f64 {
let num_f64 = num as f64;
let sqrt_result = num_f64.sqrt();
sqrt_result.ln()
}
fn parallel_processing(big_data: &BigData) -> Vec<f64> {
let num_threads = num_cpus::get();
let chunk_size = (big_data.chunks.len() + num_threads - 1) / num_threads;
let mut results = Vec::with_capacity(big_data.chunks.iter().flatten().count());
let data = Arc::new(Mutex::new(big_data.chunks.clone()));
let (sender, receiver): (Sender<f64>, Receiver<f64>) = channel();
let mut handles = Vec::new();
for i in 0..num_threads {
let data_clone = Arc::clone(&data);
let sender_clone = sender.clone();
let start = i * chunk_size;
let end = (i + 1) * chunk_size;
let handle = thread::spawn(move || {
let mut local_results = Vec::new();
let data = data_clone.lock().unwrap();
for chunk in &data[start..std::cmp::min(end, data.len())] {
for num in chunk {
local_results.push(complex_calculation(*num));
}
}
for result in local_results {
sender_clone.send(result).unwrap();
}
});
handles.push(handle);
}
for _ in 0..num_threads {
while let Ok(result) = receiver.try_recv() {
results.push(result);
}
}
for handle in handles {
handle.join().unwrap();
}
results
}
设计解释
- 内存管理:
- 使用
Arc<Mutex<Vec<Vec<u32>>>>
来管理共享数据 BigData::chunks
。Arc
提供了引用计数的堆分配数据结构,使得多个线程可以安全地共享数据。Mutex
用于线程同步,保护数据不被同时修改。
- 在并行处理过程中,每个线程获取
Mutex
的锁,读取其分配的数据块,处理完后释放锁,避免了数据竞争。
- 并行处理:
- 使用
std::thread::spawn
创建多个线程并行处理数据块。
- 根据 CPU 核心数将数据划分为多个数据块,每个线程处理一个数据块,提高了处理效率。
- 使用
std::sync::mpsc::channel
来收集每个线程的计算结果,避免了在每个线程中直接向共享结果向量中写入数据,减少了锁竞争。
- 高效遍历:
- 通过按数据块分配任务给不同线程,减少了线程间的依赖和数据共享冲突,提高了并行效率。
- 由于
Vec<u32>
中的数据在内存中是连续存储的,遍历和处理数据时具有较好的缓存局部性,减少了内存访问开销。