线程池整体架构
- 任务队列:用于存储待处理的任务,需要支持线程安全的插入和取出操作。
- 线程集合:管理线程池中的线程,线程在启动后会不断从任务队列中获取任务并执行。
- 任务调度器:负责将任务插入任务队列,并根据任务优先级进行排序。
- 线程控制器:动态调整线程池中的线程数量,根据任务队列的长度和系统负载等因素决定是否创建新线程或销毁闲置线程。
关键数据结构
- 任务结构体:
struct Task {
std::function<void()> func;
int priority;
Task(std::function<void()> f, int p) : func(f), priority(p) {}
};
- 任务队列:使用
std::priority_queue
存储任务,利用std::mutex
和std::condition_variable
保证线程安全。
class SafePriorityQueue {
private:
std::priority_queue<Task, std::vector<Task>,
std::function<bool(const Task&, const Task&)>> tasks;
mutable std::mutex mtx;
std::condition_variable cond;
public:
SafePriorityQueue() : tasks([](const Task& a, const Task& b) {
return a.priority < b.priority;
}) {}
void push(Task task) {
std::unique_lock<std::mutex> lock(mtx);
tasks.push(task);
lock.unlock();
cond.notify_one();
}
bool pop(Task& task) {
std::unique_lock<std::mutex> lock(mtx);
cond.wait(lock, [this] { return!tasks.empty(); });
if (tasks.empty()) return false;
task = tasks.top();
tasks.pop();
return true;
}
};
- 线程池类:
class ThreadPool {
private:
std::vector<std::thread> threads;
SafePriorityQueue taskQueue;
std::atomic<bool> stop;
std::mutex mtx;
int minThreads;
int maxThreads;
int currentThreads;
public:
ThreadPool(int min, int max) : minThreads(min), maxThreads(max), currentThreads(min), stop(false) {
for (int i = 0; i < minThreads; ++i) {
threads.emplace_back([this] {
while (!stop) {
Task task;
if (taskQueue.pop(task)) {
task.func();
} else {
std::this_thread::yield();
}
}
});
}
}
~ThreadPool() {
stop = true;
for (std::thread& thread : threads) {
thread.join();
}
}
void enqueue(std::function<void()> func, int priority) {
taskQueue.push(Task(func, priority));
adjustThreadCount();
}
void adjustThreadCount() {
std::unique_lock<std::mutex> lock(mtx);
if (taskQueue.size() > (currentThreads - minThreads) * 2 && currentThreads < maxThreads) {
threads.emplace_back([this] {
while (!stop) {
Task task;
if (taskQueue.pop(task)) {
task.func();
} else {
std::this_thread::yield();
}
}
});
++currentThreads;
} else if (taskQueue.empty() && currentThreads > minThreads) {
// 这里简单处理为等待一段时间后销毁线程
std::this_thread::sleep_for(std::chrono::seconds(5));
if (taskQueue.empty() && currentThreads > minThreads) {
threads.back().detach();
threads.pop_back();
--currentThreads;
}
}
}
};
实现的主要步骤
- 初始化线程池:创建一定数量的初始线程,并将它们添加到线程集合中。
- 任务入队:将任务封装成
Task
对象并插入任务队列,同时根据任务队列长度调整线程数量。
- 线程执行任务:线程不断从任务队列中取出任务并执行,直到线程池停止。
- 动态调整线程数量:根据任务队列的长度和当前线程数量,决定是否创建新线程或销毁闲置线程。
可能存在的性能瓶颈及优化
- 任务队列竞争:多个线程同时访问任务队列可能导致锁竞争。优化方法:使用无锁数据结构(如无锁队列),或者采用分段锁机制,减少锁的粒度。
- 线程创建和销毁开销:频繁创建和销毁线程会带来较大开销。优化方法:采用线程复用机制,减少不必要的线程创建和销毁;设置合理的线程数量上下限,避免过度调整。
- 线程调度开销:过多的线程会导致线程调度开销增大。优化方法:根据系统的CPU核心数和负载情况,动态调整线程数量,避免线程过多。
- 优先级队列排序开销:每次插入任务时,
std::priority_queue
可能需要进行排序。优化方法:使用更高效的优先级队列实现,如跳表或斐波那契堆,以降低插入和删除操作的时间复杂度。