1. 设计思路
- 线程池初始化:确定线程池中线程的数量,创建线程并将其设置为等待任务状态。
- 任务队列管理:使用一个队列来存储待处理的任务。任务可以是函数指针及相关参数。需要考虑线程安全,如使用互斥锁保护队列操作。
- 线程的创建与销毁:创建固定数量的线程,线程启动后不断从任务队列中获取任务执行。线程池销毁时,需要等待所有任务执行完毕,然后安全地销毁线程。
2. 关键数据结构
// 任务结构体
typedef struct Task {
void (*func)(void*); // 任务函数指针
void* arg; // 任务函数参数
struct Task* next; // 指向下一个任务的指针
} Task;
// 线程池结构体
typedef struct ThreadPool {
Task* head; // 任务队列头指针
Task* tail; // 任务队列尾指针
pthread_t* threads; // 线程数组
pthread_mutex_t mutex; // 保护任务队列的互斥锁
pthread_cond_t cond; // 条件变量,用于通知线程有新任务
int thread_count; // 线程池中的线程数量
int stop; // 线程池是否停止的标志
} ThreadPool;
3. 主要函数伪代码
// 线程池初始化函数
ThreadPool* createThreadPool(int thread_count) {
ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));
if (!pool) return NULL;
pool->thread_count = thread_count;
pool->head = pool->tail = NULL;
pool->stop = 0;
pool->threads = (pthread_t*)malloc(thread_count * sizeof(pthread_t));
if (!pool->threads) {
free(pool);
return NULL;
}
pthread_mutex_init(&pool->mutex, NULL);
pthread_cond_init(&pool->cond, NULL);
for (int i = 0; i < thread_count; ++i) {
pthread_create(&pool->threads[i], NULL, worker, pool);
}
return pool;
}
// 线程工作函数
void* worker(void* arg) {
ThreadPool* pool = (ThreadPool*)arg;
while (1) {
Task* task;
pthread_mutex_lock(&pool->mutex);
while (!pool->head &&!pool->stop) {
pthread_cond_wait(&pool->cond, &pool->mutex);
}
if (pool->stop &&!pool->head) {
pthread_mutex_unlock(&pool->mutex);
pthread_exit(NULL);
}
task = pool->head;
pool->head = task->next;
if (!pool->head) {
pool->tail = NULL;
}
pthread_mutex_unlock(&pool->mutex);
(*task->func)(task->arg);
free(task);
}
return NULL;
}
// 添加任务函数
void addTask(ThreadPool* pool, void (*func)(void*), void* arg) {
Task* task = (Task*)malloc(sizeof(Task));
if (!task) return;
task->func = func;
task->arg = arg;
task->next = NULL;
pthread_mutex_lock(&pool->mutex);
if (!pool->tail) {
pool->head = pool->tail = task;
} else {
pool->tail->next = task;
pool->tail = task;
}
pthread_cond_signal(&pool->cond);
pthread_mutex_unlock(&pool->mutex);
}
// 销毁线程池函数
void destroyThreadPool(ThreadPool* pool) {
pthread_mutex_lock(&pool->mutex);
pool->stop = 1;
pthread_mutex_unlock(&pool->mutex);
pthread_cond_broadcast(&pool->cond);
for (int i = 0; i < pool->thread_count; ++i) {
pthread_join(pool->threads[i], NULL);
}
while (pool->head) {
Task* task = pool->head;
pool->head = task->next;
free(task);
}
free(pool->threads);
pthread_mutex_destroy(&pool->mutex);
pthread_cond_destroy(&pool->cond);
free(pool);
}