面试题答案
一键面试设计思路
- 数据结构选择:
- 任务队列:使用
PriorityQueue
(可以基于std::collections::BinaryHeap
实现一个带有优先级的队列)来存储任务。每个任务包含优先级和具体的执行逻辑。这样可以保证高优先级的任务总是在队列头部,方便快速取出执行。 - 线程池:使用
Vec<Thread>
来管理线程。每个线程从任务队列中取出任务并执行。
- 任务队列:使用
- 线程间同步方式:
- 互斥锁(Mutex):为了保护任务队列,防止多个线程同时访问和修改任务队列,在访问任务队列时使用
Mutex
进行加锁。例如,在往任务队列添加任务和从任务队列取出任务时,都需要先获取Mutex
的锁。 - 条件变量(Condvar):结合
Mutex
使用Condvar
来实现线程的等待和唤醒机制。当任务队列中没有任务时,线程可以通过Condvar
进入等待状态,当有新任务添加到任务队列时,通过Condvar
唤醒等待的线程。
- 互斥锁(Mutex):为了保护任务队列,防止多个线程同时访问和修改任务队列,在访问任务队列时使用
- 性能和资源消耗权衡:
- 性能:为了提高性能,线程池的大小需要根据系统的CPU核心数、内存等资源合理设置。如果线程数过少,可能无法充分利用系统资源;如果线程数过多,会增加线程上下文切换的开销。同时,使用高效的优先级队列数据结构可以快速地获取高优先级任务,减少任务等待时间。
- 资源消耗:控制线程池大小可以避免过多的线程占用大量内存等资源。并且在任务处理完后,及时释放相关资源,比如关闭不再使用的线程等。
高并发场景下可能遇到的问题及解决方案
- 死锁:
- 问题描述:多个线程相互等待对方释放锁,导致程序无法继续执行。例如,线程A持有锁1,等待锁2,而线程B持有锁2,等待锁1。
- 解决方案:使用锁的顺序策略,规定所有线程获取锁的顺序。比如按照锁的ID从小到大获取锁,这样可以避免循环等待导致的死锁。同时,在代码编写时,仔细检查锁的获取和释放逻辑,确保不会出现死锁的情况。
- 饥饿:
- 问题描述:低优先级任务长时间得不到执行,因为高优先级任务不断进入队列,导致低优先级任务一直处于等待状态。
- 解决方案:可以采用老化策略,随着任务在队列中等待时间的增加,逐渐提升其优先级。这样可以保证低优先级任务在等待一定时间后,也有机会得到执行。
- 线程上下文切换开销:
- 问题描述:过多的线程切换会消耗大量的CPU时间,降低系统性能。
- 解决方案:合理设置线程池大小,避免创建过多线程。同时,可以采用工作窃取算法,让空闲线程从繁忙线程的任务队列中窃取任务执行,减少线程上下文切换的频率,提高系统整体性能。
示例代码框架
use std::sync::{Arc, Mutex, Condvar};
use std::thread;
use std::collections::BinaryHeap;
// 定义任务结构体,包含优先级和执行逻辑
struct Task {
priority: u32,
func: Box<dyn FnOnce() + Send>,
}
impl Ord for Task {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
other.priority.cmp(&self.priority)
}
}
impl PartialOrd for Task {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Eq for Task {}
impl PartialEq for Task {
fn eq(&self, other: &Self) -> bool {
self.priority == other.priority
}
}
struct ThreadPool {
workers: Vec<thread::JoinHandle<()>>,
task_queue: Arc<(Mutex<BinaryHeap<Task>>, Condvar)>,
stop: Arc<Mutex<bool>>,
}
impl ThreadPool {
fn new(size: usize) -> Self {
let task_queue = Arc::new((Mutex::new(BinaryHeap::new()), Condvar::new()));
let stop = Arc::new(Mutex::new(false));
let mut workers = Vec::with_capacity(size);
for _ in 0..size {
let task_queue_clone = task_queue.clone();
let stop_clone = stop.clone();
let handle = thread::spawn(move || {
loop {
let (lock, cvar) = &*task_queue_clone;
let mut queue = lock.lock().unwrap();
while queue.is_empty() &&!*stop_clone.lock().unwrap() {
queue = cvar.wait(queue).unwrap();
}
if *stop_clone.lock().unwrap() && queue.is_empty() {
break;
}
let task = queue.pop().unwrap();
drop(queue);
(task.func)();
}
});
workers.push(handle);
}
ThreadPool {
workers,
task_queue,
stop,
}
}
fn submit(&self, priority: u32, func: Box<dyn FnOnce() + Send>) {
let (lock, cvar) = &*self.task_queue;
let mut queue = lock.lock().unwrap();
queue.push(Task { priority, func });
drop(queue);
cvar.notify_one();
}
fn shutdown(self) {
*self.stop.lock().unwrap() = true;
let (_, cvar) = &*self.task_queue;
cvar.notify_all();
for worker in self.workers {
worker.join().unwrap();
}
}
}
以上代码只是一个简单的线程池实现框架,实际应用中可能需要根据具体需求进一步完善和优化,比如错误处理、任务取消等功能。