空闲线程处理策略
- 分级线程池:将任务按执行时间大致分为长任务和短任务,分别使用不同的线程池处理。这样可以避免长任务阻塞短任务的执行。
- 动态线程调整:根据任务队列的长度和系统负载动态调整线程池中的线程数量。当任务队列较长且系统资源充足时,增加线程数量;当任务队列较短且空闲线程较多时,减少线程数量,但要保证一定数量的最小线程数,避免频繁创建和销毁线程。
- 任务优先级调度:为任务分配优先级,线程优先处理高优先级任务。
代码层面实现
数据结构设计
- 任务结构体:
typedef struct Task {
void (*func)(void*);
void *arg;
int priority; // 任务优先级
struct Task *next;
} Task;
- 线程池结构体:
typedef struct ThreadPool {
Task *head;
Task *tail;
pthread_t *threads;
int thread_count;
int running_thread_count;
int min_thread_count;
int max_thread_count;
pthread_mutex_t mutex;
pthread_cond_t cond;
int shutdown;
} ThreadPool;
线程调度逻辑
- 初始化线程池:
ThreadPool* createThreadPool(int min_count, int max_count) {
ThreadPool *pool = (ThreadPool*)malloc(sizeof(ThreadPool));
pool->head = NULL;
pool->tail = NULL;
pool->thread_count = 0;
pool->running_thread_count = 0;
pool->min_thread_count = min_count;
pool->max_thread_count = max_count;
pool->threads = (pthread_t*)malloc(max_count * sizeof(pthread_t));
pthread_mutex_init(&pool->mutex, NULL);
pthread_cond_init(&pool->cond, NULL);
pool->shutdown = 0;
for (int i = 0; i < min_count; ++i) {
pthread_create(&pool->threads[i], NULL, worker, pool);
pool->thread_count++;
pool->running_thread_count++;
}
return pool;
}
- 工作线程函数:
void* worker(void* arg) {
ThreadPool *pool = (ThreadPool*)arg;
while (1) {
Task *task = NULL;
pthread_mutex_lock(&pool->mutex);
while (pool->head == NULL &&!pool->shutdown) {
pthread_cond_wait(&pool->cond, &pool->mutex);
}
if (pool->shutdown && pool->head == NULL) {
pool->thread_count--;
pthread_mutex_unlock(&pool->mutex);
pthread_exit(NULL);
}
task = pool->head;
pool->head = task->next;
if (pool->head == NULL) {
pool->tail = NULL;
}
pool->running_thread_count--;
pthread_mutex_unlock(&pool->mutex);
(*task->func)(task->arg);
free(task);
pool->running_thread_count++;
}
return NULL;
}
- 添加任务函数:
void addTask(ThreadPool *pool, void (*func)(void*), void *arg, int priority) {
Task *new_task = (Task*)malloc(sizeof(Task));
new_task->func = func;
new_task->arg = arg;
new_task->priority = priority;
new_task->next = NULL;
pthread_mutex_lock(&pool->mutex);
if (pool->tail == NULL) {
pool->head = new_task;
pool->tail = new_task;
} else {
Task *current = pool->head;
Task *prev = NULL;
while (current && current->priority >= priority) {
prev = current;
current = current->next;
}
if (prev == NULL) {
new_task->next = pool->head;
pool->head = new_task;
} else {
new_task->next = current;
prev->next = new_task;
}
if (current == NULL) {
pool->tail = new_task;
}
}
if (pool->running_thread_count < pool->thread_count && pool->head != NULL) {
pthread_cond_signal(&pool->cond);
} else if (pool->thread_count < pool->max_thread_count && pool->head != NULL) {
pthread_create(&pool->threads[pool->thread_count], NULL, worker, pool);
pool->thread_count++;
pool->running_thread_count++;
pthread_cond_signal(&pool->cond);
}
pthread_mutex_unlock(&pool->mutex);
}
- 销毁线程池:
void destroyThreadPool(ThreadPool *pool) {
pthread_mutex_lock(&pool->mutex);
pool->shutdown = 1;
pthread_cond_broadcast(&pool->cond);
pthread_mutex_unlock(&pool->mutex);
for (int i = 0; i < pool->thread_count; ++i) {
pthread_join(pool->threads[i], NULL);
}
free(pool->threads);
Task *current = pool->head;
Task *next;
while (current) {
next = current->next;
free(current);
current = next;
}
pthread_mutex_destroy(&pool->mutex);
pthread_cond_destroy(&pool->cond);
free(pool);
}
同步机制
- 互斥锁:使用
pthread_mutex_t
来保护共享数据,如任务队列、线程池状态等。在访问这些共享数据前加锁,访问完后解锁。
- 条件变量:使用
pthread_cond_t
来实现线程间的同步。当任务队列中有新任务时,通过pthread_cond_signal
或pthread_cond_broadcast
通知等待的线程;线程在任务队列为空时,通过pthread_cond_wait
等待条件变量。