面试题答案
一键面试设计思路
- 任务结构体:定义一个任务结构体,包含任务函数指针、任务参数以及任务优先级。
- 线程池结构体:包含线程数组、任务队列、队列锁、条件变量、线程数量、任务队列最大容量、已完成任务计数等。
- 任务队列操作:实现向任务队列添加任务和从任务队列取出任务的函数,操作时需加锁保护队列。
- 线程函数:线程从任务队列中取出任务并执行,若队列为空则等待条件变量通知。
- 优先级处理:在任务队列中按照优先级对任务进行排序,每次取出任务时保证取出优先级最高的任务。
关键代码片段
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include <semaphore.h>
// 定义任务结构体
typedef struct Task {
void (*func)(void*);
void *arg;
int priority;
struct Task *next;
} Task;
// 定义线程池结构体
typedef struct ThreadPool {
pthread_t *threads;
Task *taskQueue;
pthread_mutex_t queueLock;
pthread_cond_t queueNotEmpty;
int threadCount;
int queueCapacity;
int queueSize;
int finishedCount;
int shutdown;
} ThreadPool;
// 创建新任务
Task* createTask(void (*func)(void*), void *arg, int priority) {
Task *newTask = (Task*)malloc(sizeof(Task));
newTask->func = func;
newTask->arg = arg;
newTask->priority = priority;
newTask->next = NULL;
return newTask;
}
// 向任务队列添加任务
void addTask(ThreadPool *pool, Task *task) {
pthread_mutex_lock(&pool->queueLock);
while (pool->queueSize == pool->queueCapacity) {
pthread_cond_wait(&pool->queueNotEmpty, &pool->queueLock);
}
Task *current = pool->taskQueue;
if (!current || current->priority < task->priority) {
task->next = pool->taskQueue;
pool->taskQueue = task;
} else {
while (current->next && current->next->priority >= task->priority) {
current = current->next;
}
task->next = current->next;
current->next = task;
}
pool->queueSize++;
pthread_cond_signal(&pool->queueNotEmpty);
pthread_mutex_unlock(&pool->queueLock);
}
// 从任务队列取出任务
Task* getTask(ThreadPool *pool) {
pthread_mutex_lock(&pool->queueLock);
while (pool->queueSize == 0 &&!pool->shutdown) {
pthread_cond_wait(&pool->queueNotEmpty, &pool->queueLock);
}
if (pool->shutdown && pool->queueSize == 0) {
pthread_mutex_unlock(&pool->queueLock);
return NULL;
}
Task *task = pool->taskQueue;
pool->taskQueue = task->next;
pool->queueSize--;
pthread_cond_signal(&pool->queueNotEmpty);
pthread_mutex_unlock(&pool->queueLock);
return task;
}
// 线程执行函数
void* worker(void* arg) {
ThreadPool *pool = (ThreadPool*)arg;
while (1) {
Task *task = getTask(pool);
if (!task) break;
task->func(task->arg);
free(task);
pthread_mutex_lock(&pool->queueLock);
pool->finishedCount++;
pthread_mutex_unlock(&pool->queueLock);
}
return NULL;
}
// 创建线程池
ThreadPool* createThreadPool(int threadCount, int queueCapacity) {
ThreadPool *pool = (ThreadPool*)malloc(sizeof(ThreadPool));
pool->threads = (pthread_t*)malloc(threadCount * sizeof(pthread_t));
pool->taskQueue = NULL;
pool->threadCount = threadCount;
pool->queueCapacity = queueCapacity;
pool->queueSize = 0;
pool->finishedCount = 0;
pool->shutdown = 0;
pthread_mutex_init(&pool->queueLock, NULL);
pthread_cond_init(&pool->queueNotEmpty, NULL);
for (int i = 0; i < threadCount; i++) {
pthread_create(&pool->threads[i], NULL, worker, pool);
}
return pool;
}
// 销毁线程池
void destroyThreadPool(ThreadPool *pool) {
pthread_mutex_lock(&pool->queueLock);
pool->shutdown = 1;
pthread_cond_broadcast(&pool->queueNotEmpty);
pthread_mutex_unlock(&pool->queueLock);
for (int i = 0; i < pool->threadCount; i++) {
pthread_join(pool->threads[i], NULL);
}
while (pool->taskQueue) {
Task *temp = pool->taskQueue;
pool->taskQueue = pool->taskQueue->next;
free(temp);
}
pthread_mutex_destroy(&pool->queueLock);
pthread_cond_destroy(&pool->queueNotEmpty);
free(pool->threads);
free(pool);
}
// 示例任务函数
void exampleTask(void *arg) {
int num = *((int*)arg);
printf("Task %d is running\n", num);
sleep(1);
}
可以通过以下方式使用线程池:
int main() {
ThreadPool *pool = createThreadPool(3, 5);
int nums[5] = {1, 2, 3, 4, 5};
for (int i = 0; i < 5; i++) {
Task *task = createTask(exampleTask, &nums[i], i);
addTask(pool, task);
}
while (1) {
pthread_mutex_lock(&pool->queueLock);
if (pool->finishedCount == 5) break;
pthread_mutex_unlock(&pool->queueLock);
sleep(1);
}
destroyThreadPool(pool);
return 0;
}