MST

星途 面试题库

面试题:C语言线程池动态任务调度优化

假设已有一个C语言线程池,任务到来速率不定,如何优化任务调度策略,以适应不同类型任务(如I/O密集型和计算密集型)的处理需求,尽可能提高系统整体性能?请说明你的设计思路及关键代码实现。
44.8万 热度难度
编程语言C

知识考点

AI 面试

面试题答案

一键面试

设计思路

  1. 任务分类:为每个任务添加类型标识,区分I/O密集型和计算密集型任务。
  2. 队列分离:创建两个任务队列,分别用于存放I/O密集型和计算密集型任务。
  3. 线程分组:将线程池中的线程分为两组,一组专门处理I/O密集型任务,另一组专门处理计算密集型任务。根据系统资源和任务特点,合理分配两组线程的数量。例如,如果系统I/O资源充足,可适当增加处理I/O密集型任务的线程数量。
  4. 调度策略:采用不同的调度算法。对于I/O密集型任务队列,使用优先队列,优先处理到达的I/O任务,因为I/O操作通常可以与其他计算操作并行进行,减少I/O等待时间。对于计算密集型任务队列,采用公平调度算法,如时间片轮转调度,确保每个计算任务都能公平地获得CPU时间。

关键代码实现

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>

// 任务类型枚举
typedef enum {
    IO_BOUND,
    COMPUTE_BOUND
} TaskType;

// 任务结构体
typedef struct Task {
    void (*func)(void*);
    void *arg;
    TaskType type;
    struct Task *next;
} Task;

// 任务队列结构体
typedef struct Queue {
    Task *head;
    Task *tail;
    pthread_mutex_t mutex;
    pthread_cond_t cond;
} Queue;

// 线程池结构体
typedef struct ThreadPool {
    pthread_t *threads;
    int num_threads;
    int io_threads;
    Queue io_queue;
    Queue compute_queue;
    int stop;
} ThreadPool;

// 初始化任务队列
void initQueue(Queue *queue) {
    queue->head = NULL;
    queue->tail = NULL;
    pthread_mutex_init(&queue->mutex, NULL);
    pthread_cond_init(&queue->cond, NULL);
}

// 添加任务到队列
void addTask(Queue *queue, Task *task) {
    pthread_mutex_lock(&queue->mutex);
    if (queue->tail == NULL) {
        queue->head = queue->tail = task;
    } else {
        queue->tail->next = task;
        queue->tail = task;
    }
    pthread_cond_signal(&queue->cond);
    pthread_mutex_unlock(&queue->mutex);
}

// 从队列中取出任务
Task* getTask(Queue *queue) {
    pthread_mutex_lock(&queue->mutex);
    while (queue->head == NULL) {
        pthread_cond_wait(&queue->cond, &queue->mutex);
    }
    Task *task = queue->head;
    if (queue->head == queue->tail) {
        queue->head = queue->tail = NULL;
    } else {
        queue->head = queue->head->next;
    }
    pthread_mutex_unlock(&queue->mutex);
    return task;
}

// 线程执行函数
void* worker(void* arg) {
    ThreadPool *pool = (ThreadPool *)arg;
    while (1) {
        Task *task = NULL;
        if (pthread_self() < pool->threads[pool->io_threads]) {
            task = getTask(&pool->io_queue);
        } else {
            task = getTask(&pool->compute_queue);
        }
        if (task == NULL && pool->stop) {
            break;
        }
        task->func(task->arg);
        free(task);
    }
    return NULL;
}

// 初始化线程池
ThreadPool* createThreadPool(int num_threads, int io_threads) {
    ThreadPool *pool = (ThreadPool *)malloc(sizeof(ThreadPool));
    pool->num_threads = num_threads;
    pool->io_threads = io_threads;
    pool->threads = (pthread_t *)malloc(num_threads * sizeof(pthread_t));
    initQueue(&pool->io_queue);
    initQueue(&pool->compute_queue);
    pool->stop = 0;
    for (int i = 0; i < num_threads; i++) {
        pthread_create(&pool->threads[i], NULL, worker, pool);
    }
    return pool;
}

// 向线程池添加任务
void addTaskToPool(ThreadPool *pool, void (*func)(void*), void *arg, TaskType type) {
    Task *task = (Task *)malloc(sizeof(Task));
    task->func = func;
    task->arg = arg;
    task->type = type;
    task->next = NULL;
    if (type == IO_BOUND) {
        addTask(&pool->io_queue, task);
    } else {
        addTask(&pool->compute_queue, task);
    }
}

// 销毁线程池
void destroyThreadPool(ThreadPool *pool) {
    pool->stop = 1;
    for (int i = 0; i < pool->num_threads; i++) {
        pthread_cancel(pool->threads[i]);
        pthread_join(pool->threads[i], NULL);
    }
    free(pool->threads);
    pthread_mutex_destroy(&pool->io_queue.mutex);
    pthread_cond_destroy(&pool->io_queue.cond);
    pthread_mutex_destroy(&pool->compute_queue.mutex);
    pthread_cond_destroy(&pool->compute_queue.cond);
    free(pool);
}

// 示例任务函数
void ioTask(void* arg) {
    printf("IO task is running\n");
    // 模拟I/O操作
    sleep(1);
}

void computeTask(void* arg) {
    printf("Compute task is running\n");
    // 模拟计算操作
    for (int i = 0; i < 1000000000; i++);
}

int main() {
    ThreadPool *pool = createThreadPool(5, 2);
    addTaskToPool(pool, ioTask, NULL, IO_BOUND);
    addTaskToPool(pool, computeTask, NULL, COMPUTE_BOUND);
    sleep(2);
    destroyThreadPool(pool);
    return 0;
}

上述代码实现了一个简单的线程池,通过将任务按类型分类,分别放入不同队列,并由不同线程组处理,以适应不同类型任务的处理需求,提高系统整体性能。关键部分包括任务队列的操作、线程执行函数以及线程池的创建和销毁等。