数据结构设计
- 任务结构体:用于表示任务,包含任务函数指针和任务函数的参数。
typedef struct Task {
void* (*func)(void*); // 任务函数指针
void* arg; // 任务函数参数
struct Task* next; // 指向下一个任务的指针
} Task;
- 任务队列结构体:用于管理任务链表。
typedef struct TaskQueue {
Task* head;
Task* tail;
} TaskQueue;
- 线程池结构体:包含线程数组、任务队列、线程数量、空闲线程数量等信息。
typedef struct ThreadPool {
pthread_t* threads;
TaskQueue taskQueue;
int threadCount;
int idleCount;
pthread_mutex_t mutex;
pthread_cond_t cond;
int stop;
} ThreadPool;
关键调度逻辑代码片段
- 初始化线程池:创建线程并初始化相关变量。
ThreadPool* createThreadPool(int numThreads) {
ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));
pool->threadCount = numThreads;
pool->idleCount = 0;
pool->stop = 0;
pool->threads = (pthread_t*)malloc(numThreads * sizeof(pthread_t));
pthread_mutex_init(&pool->mutex, NULL);
pthread_cond_init(&pool->cond, NULL);
pool->taskQueue.head = NULL;
pool->taskQueue.tail = NULL;
for (int i = 0; i < numThreads; ++i) {
pthread_create(&pool->threads[i], NULL, worker, (void*)pool);
}
return pool;
}
- 工作线程函数:从任务队列中取出任务并执行。
void* worker(void* arg) {
ThreadPool* pool = (ThreadPool*)arg;
while (1) {
Task* task = NULL;
pthread_mutex_lock(&pool->mutex);
while (pool->taskQueue.head == NULL &&!pool->stop) {
pool->idleCount++;
pthread_cond_wait(&pool->cond, &pool->mutex);
pool->idleCount--;
}
if (pool->stop && pool->taskQueue.head == NULL) {
pthread_mutex_unlock(&pool->mutex);
pthread_exit(NULL);
}
task = pool->taskQueue.head;
pool->taskQueue.head = task->next;
if (pool->taskQueue.head == NULL) {
pool->taskQueue.tail = NULL;
}
pthread_mutex_unlock(&pool->mutex);
(*task->func)(task->arg);
free(task);
}
return NULL;
}
- 添加任务到任务队列:
void addTask(ThreadPool* pool, void* (*func)(void*), void* arg) {
Task* task = (Task*)malloc(sizeof(Task));
task->func = func;
task->arg = arg;
task->next = NULL;
pthread_mutex_lock(&pool->mutex);
if (pool->taskQueue.tail == NULL) {
pool->taskQueue.head = task;
pool->taskQueue.tail = task;
} else {
pool->taskQueue.tail->next = task;
pool->taskQueue.tail = task;
}
if (pool->idleCount > 0) {
pthread_cond_signal(&pool->cond);
}
pthread_mutex_unlock(&pool->mutex);
}
- 销毁线程池:
void destroyThreadPool(ThreadPool* pool) {
pthread_mutex_lock(&pool->mutex);
pool->stop = 1;
pthread_mutex_unlock(&pool->mutex);
pthread_cond_broadcast(&pool->cond);
for (int i = 0; i < pool->threadCount; ++i) {
pthread_join(pool->threads[i], NULL);
}
pthread_mutex_destroy(&pool->mutex);
pthread_cond_destroy(&pool->cond);
free(pool->threads);
Task* task = pool->taskQueue.head;
Task* next;
while (task != NULL) {
next = task->next;
free(task);
task = next;
}
free(pool);
}