1. 实现简单线程池的步骤
- 定义线程池结构体:
typedef struct {
pthread_t *threads; // 线程数组
int thread_count; // 线程数量
int queue_size; // 任务队列大小
int head; // 任务队列头
int tail; // 任务队列尾
int count; // 当前任务数量
void **tasks; // 任务队列
pthread_mutex_t lock; // 互斥锁
pthread_cond_t cond; // 条件变量
int shutdown; // 关闭标志
} ThreadPool;
- 初始化线程池:
ThreadPool* createThreadPool(int thread_count, int queue_size) {
ThreadPool *pool = (ThreadPool*)malloc(sizeof(ThreadPool));
pool->thread_count = thread_count;
pool->queue_size = queue_size;
pool->head = 0;
pool->tail = 0;
pool->count = 0;
pool->shutdown = 0;
pool->threads = (pthread_t*)malloc(thread_count * sizeof(pthread_t));
pool->tasks = (void**)malloc(queue_size * sizeof(void*));
pthread_mutex_init(&pool->lock, NULL);
pthread_cond_init(&pool->cond, NULL);
for (int i = 0; i < thread_count; i++) {
pthread_create(&pool->threads[i], NULL, worker, (void*)pool);
}
return pool;
}
- 定义任务函数指针类型:
typedef void* (*TaskFunction)(void*);
- 向线程池添加任务:
int addTask(ThreadPool *pool, TaskFunction func, void *arg) {
pthread_mutex_lock(&pool->lock);
while (pool->count == pool->queue_size &&!pool->shutdown) {
pthread_cond_wait(&pool->cond, &pool->lock);
}
if (pool->shutdown) {
pthread_mutex_unlock(&pool->lock);
return -1;
}
pool->tasks[pool->tail] = (void*)func;
pool->tasks[pool->tail + 1] = arg;
pool->tail = (pool->tail + 2) % pool->queue_size;
pool->count++;
pthread_cond_signal(&pool->cond);
pthread_mutex_unlock(&pool->lock);
return 0;
}
- 工作线程函数:
void* worker(void *arg) {
ThreadPool *pool = (ThreadPool*)arg;
while (1) {
pthread_mutex_lock(&pool->lock);
while (pool->count == 0 &&!pool->shutdown) {
pthread_cond_wait(&pool->cond, &pool->lock);
}
if (pool->shutdown && pool->count == 0) {
pthread_mutex_unlock(&pool->lock);
pthread_exit(NULL);
}
TaskFunction func = (TaskFunction)pool->tasks[pool->head];
void *arg = pool->tasks[pool->head + 1];
pool->head = (pool->head + 2) % pool->queue_size;
pool->count--;
pthread_mutex_unlock(&pool->lock);
func(arg);
}
return NULL;
}
- 销毁线程池:
void destroyThreadPool(ThreadPool *pool) {
pthread_mutex_lock(&pool->lock);
pool->shutdown = 1;
pthread_cond_broadcast(&pool->cond);
pthread_mutex_unlock(&pool->lock);
for (int i = 0; i < pool->thread_count; i++) {
pthread_join(pool->threads[i], NULL);
}
pthread_mutex_destroy(&pool->lock);
pthread_cond_destroy(&pool->cond);
free(pool->threads);
free(pool->tasks);
free(pool);
}
2. 先来先服务任务调度策略基本实现思路
- 任务入队:当有新任务到来时,按照顺序将任务添加到任务队列的尾部。在上述实现中,
addTask
函数通过pool->tail
指针将任务添加到任务队列末尾。
- 任务出队:工作线程从任务队列头部取出任务执行。在
worker
函数中,通过pool->head
指针从任务队列头部获取任务。由于任务是按照顺序入队的,所以先进入队列的任务会先被取出执行,从而实现先来先服务的调度策略。