面试题答案
一键面试设计思路
- 任务分类:为每个任务添加类型标识,区分I/O密集型和计算密集型任务。
- 队列分离:创建两个任务队列,分别用于存放I/O密集型和计算密集型任务。
- 线程分组:将线程池中的线程分为两组,一组专门处理I/O密集型任务,另一组专门处理计算密集型任务。根据系统资源和任务特点,合理分配两组线程的数量。例如,如果系统I/O资源充足,可适当增加处理I/O密集型任务的线程数量。
- 调度策略:采用不同的调度算法。对于I/O密集型任务队列,使用优先队列,优先处理到达的I/O任务,因为I/O操作通常可以与其他计算操作并行进行,减少I/O等待时间。对于计算密集型任务队列,采用公平调度算法,如时间片轮转调度,确保每个计算任务都能公平地获得CPU时间。
关键代码实现
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
// 任务类型枚举
typedef enum {
IO_BOUND,
COMPUTE_BOUND
} TaskType;
// 任务结构体
typedef struct Task {
void (*func)(void*);
void *arg;
TaskType type;
struct Task *next;
} Task;
// 任务队列结构体
typedef struct Queue {
Task *head;
Task *tail;
pthread_mutex_t mutex;
pthread_cond_t cond;
} Queue;
// 线程池结构体
typedef struct ThreadPool {
pthread_t *threads;
int num_threads;
int io_threads;
Queue io_queue;
Queue compute_queue;
int stop;
} ThreadPool;
// 初始化任务队列
void initQueue(Queue *queue) {
queue->head = NULL;
queue->tail = NULL;
pthread_mutex_init(&queue->mutex, NULL);
pthread_cond_init(&queue->cond, NULL);
}
// 添加任务到队列
void addTask(Queue *queue, Task *task) {
pthread_mutex_lock(&queue->mutex);
if (queue->tail == NULL) {
queue->head = queue->tail = task;
} else {
queue->tail->next = task;
queue->tail = task;
}
pthread_cond_signal(&queue->cond);
pthread_mutex_unlock(&queue->mutex);
}
// 从队列中取出任务
Task* getTask(Queue *queue) {
pthread_mutex_lock(&queue->mutex);
while (queue->head == NULL) {
pthread_cond_wait(&queue->cond, &queue->mutex);
}
Task *task = queue->head;
if (queue->head == queue->tail) {
queue->head = queue->tail = NULL;
} else {
queue->head = queue->head->next;
}
pthread_mutex_unlock(&queue->mutex);
return task;
}
// 线程执行函数
void* worker(void* arg) {
ThreadPool *pool = (ThreadPool *)arg;
while (1) {
Task *task = NULL;
if (pthread_self() < pool->threads[pool->io_threads]) {
task = getTask(&pool->io_queue);
} else {
task = getTask(&pool->compute_queue);
}
if (task == NULL && pool->stop) {
break;
}
task->func(task->arg);
free(task);
}
return NULL;
}
// 初始化线程池
ThreadPool* createThreadPool(int num_threads, int io_threads) {
ThreadPool *pool = (ThreadPool *)malloc(sizeof(ThreadPool));
pool->num_threads = num_threads;
pool->io_threads = io_threads;
pool->threads = (pthread_t *)malloc(num_threads * sizeof(pthread_t));
initQueue(&pool->io_queue);
initQueue(&pool->compute_queue);
pool->stop = 0;
for (int i = 0; i < num_threads; i++) {
pthread_create(&pool->threads[i], NULL, worker, pool);
}
return pool;
}
// 向线程池添加任务
void addTaskToPool(ThreadPool *pool, void (*func)(void*), void *arg, TaskType type) {
Task *task = (Task *)malloc(sizeof(Task));
task->func = func;
task->arg = arg;
task->type = type;
task->next = NULL;
if (type == IO_BOUND) {
addTask(&pool->io_queue, task);
} else {
addTask(&pool->compute_queue, task);
}
}
// 销毁线程池
void destroyThreadPool(ThreadPool *pool) {
pool->stop = 1;
for (int i = 0; i < pool->num_threads; i++) {
pthread_cancel(pool->threads[i]);
pthread_join(pool->threads[i], NULL);
}
free(pool->threads);
pthread_mutex_destroy(&pool->io_queue.mutex);
pthread_cond_destroy(&pool->io_queue.cond);
pthread_mutex_destroy(&pool->compute_queue.mutex);
pthread_cond_destroy(&pool->compute_queue.cond);
free(pool);
}
// 示例任务函数
void ioTask(void* arg) {
printf("IO task is running\n");
// 模拟I/O操作
sleep(1);
}
void computeTask(void* arg) {
printf("Compute task is running\n");
// 模拟计算操作
for (int i = 0; i < 1000000000; i++);
}
int main() {
ThreadPool *pool = createThreadPool(5, 2);
addTaskToPool(pool, ioTask, NULL, IO_BOUND);
addTaskToPool(pool, computeTask, NULL, COMPUTE_BOUND);
sleep(2);
destroyThreadPool(pool);
return 0;
}
上述代码实现了一个简单的线程池,通过将任务按类型分类,分别放入不同队列,并由不同线程组处理,以适应不同类型任务的处理需求,提高系统整体性能。关键部分包括任务队列的操作、线程执行函数以及线程池的创建和销毁等。