设计思路
- 任务队列:使用无锁数据结构来存储任务,以避免多线程竞争锁带来的性能开销。例如,
crossbeam::queue::MsQueue
是一个高效的无锁队列,适合这种场景。
- 优先级管理:为每个任务分配优先级,在任务入队时按照优先级排序。可以使用优先级队列(如
PriorityQueue
来自 priority_queue
库)。
- 线程池:利用线程池来管理并发任务,减少线程创建和销毁的开销。
rayon
库提供了一个线程池实现,能有效利用多核CPU。
- 异步编程:对于控制台输入输出操作,使用异步I/O来避免阻塞线程,提高整体并发性能。
tokio
是Rust中常用的异步运行时。
关键代码片段
use crossbeam::queue::MsQueue;
use priority_queue::PriorityQueue;
use std::sync::{Arc, Mutex};
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use rayon::prelude::*;
// 定义任务结构体
struct Task {
priority: u32,
// 任务具体逻辑
handler: Box<dyn FnOnce() -> Result<(), io::Error>>,
}
// 线程安全的任务队列
type TaskQueue = Arc<Mutex<PriorityQueue<Task, u32>>>;
async fn read_input() -> Result<String, io::Error> {
let mut buffer = String::new();
io::stdin().read_line(&mut buffer).await?;
Ok(buffer)
}
async fn write_output(output: &str) -> Result<(), io::Error> {
io::stdout().write_all(output.as_bytes()).await?;
io::stdout().flush().await?;
Ok(())
}
fn process_task(task: Task) {
(task.handler)().unwrap();
}
async fn worker(queue: TaskQueue) {
loop {
let task = {
let mut inner = queue.lock().unwrap();
inner.pop()
};
match task {
Some((task, _)) => process_task(task),
None => break,
}
}
}
#[tokio::main]
async fn main() {
let queue: TaskQueue = Arc::new(Mutex::new(PriorityQueue::new()));
// 模拟添加任务
let task1 = Task {
priority: 1,
handler: Box::new(|| {
let input = read_input().await.unwrap();
let output = format!("Processed: {}", input);
write_output(&output).await.unwrap();
Ok(())
}),
};
let task2 = Task {
priority: 2,
handler: Box::new(|| {
let input = read_input().await.unwrap();
let output = format!("Processed: {}", input);
write_output(&output).await.unwrap();
Ok(())
}),
};
{
let mut inner = queue.lock().unwrap();
inner.push(task1, task1.priority);
inner.push(task2, task2.priority);
}
// 创建线程池处理任务
let num_threads = num_cpus::get();
(0..num_threads).into_par_iter().for_each(|_| {
let queue_clone = queue.clone();
tokio::spawn(async move {
worker(queue_clone).await;
});
});
}
性能优化点解释
- 无锁任务队列:
MsQueue
用于线程间高效传递任务,避免锁竞争,提升并发性能。
- 优先级队列:
PriorityQueue
确保高优先级任务优先处理,符合题目要求。
- 线程池:使用
rayon
的并行迭代来创建线程池,充分利用多核CPU,减少线程创建销毁开销。
- 异步I/O:利用
tokio
的异步I/O操作,避免控制台输入输出阻塞线程,提高整体并发能力。