主要步骤
- 任务队列设计:创建一个任务队列,用于存储新到来的任务。
- 线程状态管理:每个线程需要有一个状态标识,表明是空闲还是忙碌。
- 任务分配逻辑:当新任务到来时,找到一个空闲线程,并将任务分配给它。如果没有空闲线程,则将任务放入任务队列。
- 线程循环:每个线程在启动后,不断检查自己是否有任务可执行,若没有则进入空闲状态,若有则执行任务。
关键数据结构
- 任务结构体:
typedef struct Task {
void (*func)(void*); // 任务函数指针
void* arg; // 任务函数参数
struct Task* next; // 指向下一个任务的指针
} Task;
- 线程池结构体:
typedef struct ThreadPool {
Task* taskQueue; // 任务队列头指针
Task* queueTail; // 任务队列尾指针
pthread_t* threads; // 线程数组
int threadCount; // 线程数量
int *threadStatus; // 线程状态数组,0表示空闲,1表示忙碌
} ThreadPool;
关键函数伪代码
- 初始化线程池:
ThreadPool* createThreadPool(int numThreads) {
ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));
pool->taskQueue = NULL;
pool->queueTail = NULL;
pool->threadCount = numThreads;
pool->threads = (pthread_t*)malloc(numThreads * sizeof(pthread_t));
pool->threadStatus = (int*)malloc(numThreads * sizeof(int));
for (int i = 0; i < numThreads; i++) {
pool->threadStatus[i] = 0;
pthread_create(&pool->threads[i], NULL, threadFunction, pool);
}
return pool;
}
- 添加任务:
void addTask(ThreadPool* pool, void (*func)(void*), void* arg) {
Task* newTask = (Task*)malloc(sizeof(Task));
newTask->func = func;
newTask->arg = arg;
newTask->next = NULL;
for (int i = 0; i < pool->threadCount; i++) {
if (pool->threadStatus[i] == 0) {
pool->threadStatus[i] = 1;
pthread_t tid = pool->threads[i];
pthread_create(&tid, NULL, executeTask, newTask);
return;
}
}
if (pool->queueTail == NULL) {
pool->taskQueue = newTask;
pool->queueTail = newTask;
} else {
pool->queueTail->next = newTask;
pool->queueTail = newTask;
}
}
- 线程执行函数:
void* threadFunction(void* arg) {
ThreadPool* pool = (ThreadPool*)arg;
while (1) {
if (pool->taskQueue != NULL) {
Task* task = pool->taskQueue;
pool->taskQueue = task->next;
if (pool->taskQueue == NULL) {
pool->queueTail = NULL;
}
executeTask(task);
} else {
// 空闲状态,可设置为等待信号量等优化
pool->threadStatus[?] = 0;
sleep(1);
}
}
return NULL;
}
- 执行任务函数:
void executeTask(Task* task) {
task->func(task->arg);
free(task);
}
注意事项
- 实际代码中需要考虑线程安全问题,如对任务队列的操作需要加锁。
- 可以使用信号量等机制优化线程等待和任务分配过程。