1. 线程池的设计与实现
- 设计思路:
- 任务队列:使用一个队列来存储待处理的任务。可以选择链表或数组实现,链表更适合动态添加和删除任务,数组在空间利用和遍历上有优势,这里选择链表实现。
- 线程池:维护一组固定数量的线程,这些线程从任务队列中取出任务并执行。
- 线程管理:提供初始化、销毁线程池,添加任务到队列等接口。
- 核心代码框架:
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
// 任务结构体
typedef struct task {
void (*func)(void*);
void *arg;
struct task *next;
} task_t;
// 线程池结构体
typedef struct thread_pool {
task_t *head;
task_t *tail;
pthread_t *threads;
int thread_count;
int shutdown;
pthread_mutex_t mutex;
pthread_cond_t cond;
} thread_pool_t;
// 初始化线程池
thread_pool_t* create_thread_pool(int thread_count) {
thread_pool_t *pool = (thread_pool_t*)malloc(sizeof(thread_pool_t));
pool->head = NULL;
pool->tail = NULL;
pool->thread_count = thread_count;
pool->shutdown = 0;
pool->threads = (pthread_t*)malloc(thread_count * sizeof(pthread_t));
pthread_mutex_init(&pool->mutex, NULL);
pthread_cond_init(&pool->cond, NULL);
for (int i = 0; i < thread_count; i++) {
pthread_create(&pool->threads[i], NULL, (void*(*)(void*))worker, pool);
}
return pool;
}
// 工作线程函数
void* worker(void *arg) {
thread_pool_t *pool = (thread_pool_t*)arg;
task_t *task;
while (1) {
pthread_mutex_lock(&pool->mutex);
while (pool->head == NULL &&!pool->shutdown) {
pthread_cond_wait(&pool->cond, &pool->mutex);
}
if (pool->shutdown && pool->head == NULL) {
pthread_mutex_unlock(&pool->mutex);
pthread_exit(NULL);
}
task = pool->head;
pool->head = task->next;
if (pool->head == NULL) {
pool->tail = NULL;
}
pthread_mutex_unlock(&pool->mutex);
(*(task->func))(task->arg);
free(task);
}
return NULL;
}
// 添加任务到线程池
void add_task(thread_pool_t *pool, void (*func)(void*), void *arg) {
task_t *new_task = (task_t*)malloc(sizeof(task_t));
new_task->func = func;
new_task->arg = arg;
new_task->next = NULL;
pthread_mutex_lock(&pool->mutex);
if (pool->tail == NULL) {
pool->head = new_task;
pool->tail = new_task;
} else {
pool->tail->next = new_task;
pool->tail = new_task;
}
pthread_cond_signal(&pool->cond);
pthread_mutex_unlock(&pool->mutex);
}
// 销毁线程池
void destroy_thread_pool(thread_pool_t *pool) {
pthread_mutex_lock(&pool->mutex);
pool->shutdown = 1;
pthread_cond_broadcast(&pool->cond);
pthread_mutex_unlock(&pool->mutex);
for (int i = 0; i < pool->thread_count; i++) {
pthread_join(pool->threads[i], NULL);
}
task_t *current = pool->head;
task_t *next;
while (current != NULL) {
next = current->next;
free(current);
current = next;
}
free(pool->threads);
pthread_mutex_destroy(&pool->mutex);
pthread_cond_destroy(&pool->cond);
free(pool);
}
2. 线程调度策略的选择
- SCHED_OTHER:这是默认策略,适用于一般的分时调度,内核通过时间片轮转方式调度线程,适合I/O和计算混合型任务。但对于高并发海量数据处理场景,可能会因频繁上下文切换影响性能。
- SCHED_RR:基于时间片轮转的实时调度策略,每个线程被分配固定时间片,时间片用完后进入队列尾部等待下次调度。适合对响应时间敏感的任务,但如果线程数量过多,时间片分配过于细碎,会导致上下文切换开销增大。
- SCHED_FIFO:先进先出实时调度策略,线程按照进入调度队列的顺序执行,直到主动放弃CPU或被更高优先级线程抢占。适用于对实时性要求极高,且执行时间较短的任务。在海量数据处理场景中,若使用该策略,可能会导致某些线程长时间占用CPU,其他线程饥饿。
- 选择建议:对于计算和I/O混合且对响应时间有一定要求的场景,可优先尝试SCHED_RR,并根据实际性能测试调整时间片大小;若任务中有部分对实时性要求极高的计算任务,可对这些任务的线程使用SCHED_FIFO,同时注意避免线程饥饿问题。
3. 减少上下文切换
- 优化线程数量:合理设置线程池中的线程数量,避免线程过多导致频繁上下文切换。可以通过测试不同线程数量下的性能,找到最优值。一般可参考CPU核心数,对于计算密集型任务,线程数可设置为CPU核心数;对于I/O密集型任务,线程数可适当增大,但不宜过大。
- 数据局部性:尽量让线程处理的数据在内存中具有局部性,减少缓存不命中。例如,将相关数据存储在连续内存区域,使线程在处理数据时能充分利用CPU缓存,减少内存访问开销,间接减少上下文切换频率。
- 避免不必要的锁竞争:锁的竞争会导致线程阻塞和上下文切换。在设计数据结构和同步机制时,尽量减少锁的粒度和持有时间。例如,使用读写锁(pthread_rwlock)代替互斥锁,对于读多写少的场景,能显著减少锁竞争。
4. 处理线程间复杂依赖关系
- 使用条件变量(pthread_cond_t):
- 设计思路:创建一个条件变量和一个互斥锁。在依赖其他线程的线程中,使用条件变量等待特定条件满足(例如特定线程完成任务)。在被依赖的线程完成任务后,通过条件变量通知等待的线程。
- 核心代码框架:
pthread_mutex_t dep_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t dep_cond = PTHREAD_COND_INITIALIZER;
int task_completed = 0;
// 被依赖线程
void* dependent_thread(void *arg) {
// 执行任务
//...
pthread_mutex_lock(&dep_mutex);
task_completed = 1;
pthread_cond_signal(&dep_cond);
pthread_mutex_unlock(&dep_mutex);
return NULL;
}
// 依赖线程
void* depending_thread(void *arg) {
pthread_mutex_lock(&dep_mutex);
while (!task_completed) {
pthread_cond_wait(&dep_cond, &dep_mutex);
}
pthread_mutex_unlock(&dep_mutex);
// 执行依赖任务
//...
return NULL;
}
- 使用屏障(pthread_barrier_t):如果多个线程需要等待所有线程都完成某一阶段任务后再继续执行下一阶段,可使用屏障。
- 设计思路:初始化一个屏障,设置需要等待的线程数量。每个线程执行到屏障点时,调用pthread_barrier_wait函数,所有线程都到达屏障点后,屏障解除,所有线程继续执行。
- 核心代码框架:
pthread_barrier_t barrier;
// 初始化屏障
pthread_barrier_init(&barrier, NULL, num_threads);
// 线程函数
void* thread_func(void *arg) {
// 执行第一阶段任务
//...
pthread_barrier_wait(&barrier);
// 执行第二阶段任务
//...
return NULL;
}