MST

星途 面试题库

面试题:C语言Linux线程池模型下任务优先级队列的优化

假设已有一个基于C语言在Linux下实现的线程池模型且具备任务优先级设定功能,但目前任务优先级队列在高并发情况下性能较低。请分析可能存在的性能瓶颈,并提出优化方案,同时给出代码修改示例。
50.1万 热度难度
编程语言C

知识考点

AI 面试

面试题答案

一键面试

性能瓶颈分析

  1. 锁竞争:在高并发环境下,对任务优先级队列进行操作(如插入、删除任务)时,若使用普通锁进行同步,会导致大量线程竞争锁资源,造成性能瓶颈。
  2. 队列操作时间复杂度:如果任务优先级队列采用普通链表或数组实现,插入和删除操作的时间复杂度较高(链表插入删除为O(n),数组插入删除在某些情况下为O(n)),在高并发频繁操作队列时性能较低。
  3. 上下文切换开销:线程频繁竞争资源,导致操作系统频繁进行上下文切换,增加系统开销。

优化方案

  1. 锁优化
    • 使用读写锁(pthread_rwlock),对于读操作(如获取任务),多个线程可以同时进行,减少锁竞争。
    • 采用细粒度锁,只在对任务优先级队列进行关键操作(如插入、删除)时加锁,而不是整个线程池操作都加同一把锁。
  2. 数据结构优化
    • 采用堆(如二叉堆)来实现任务优先级队列。堆的插入和删除操作时间复杂度为O(log n),相比普通链表和数组在频繁操作时性能更好。
    • 可以使用平衡二叉搜索树(如红黑树),其插入、删除和查找操作时间复杂度均为O(log n),并且可以维护元素的顺序,适合实现优先级队列。
  3. 减少上下文切换
    • 调整线程池的线程数量,避免线程数量过多导致过度竞争。可以根据系统CPU核心数、任务类型等动态调整线程池大小。
    • 使用线程本地存储(TLS,pthread_key_t)来减少线程间的数据共享,降低锁的使用频率。

代码修改示例(以堆为例优化任务优先级队列)

假设原任务结构体如下:

typedef struct Task {
    void (*func)(void*);
    void *arg;
    int priority;
    struct Task *next;
} Task;

原线程池结构体假设如下:

typedef struct ThreadPool {
    Task *queue;
    pthread_mutex_t queue_lock;
    int queue_size;
    int max_queue_size;
    // 其他线程池相关成员
} ThreadPool;

优化后的代码:

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>

// 定义任务结构体
typedef struct Task {
    void (*func)(void*);
    void *arg;
    int priority;
} Task;

// 定义线程池结构体
typedef struct ThreadPool {
    Task **heap;
    pthread_rwlock_t rwlock;
    int heap_size;
    int max_heap_size;
    // 其他线程池相关成员
} ThreadPool;

// 交换两个任务
void swap(Task **a, Task **b) {
    Task *temp = *a;
    *a = *b;
    *b = temp;
}

// 上浮操作,维护堆性质
void heapify_up(ThreadPool *pool, int index) {
    while (index > 0) {
        int parent = (index - 1) / 2;
        if (pool->heap[parent]->priority < pool->heap[index]->priority) {
            swap(&pool->heap[parent], &pool->heap[index]);
            index = parent;
        } else {
            break;
        }
    }
}

// 下沉操作,维护堆性质
void heapify_down(ThreadPool *pool, int index) {
    int largest = index;
    int left = 2 * index + 1;
    int right = 2 * index + 2;

    if (left < pool->heap_size && pool->heap[left]->priority > pool->heap[largest]->priority) {
        largest = left;
    }
    if (right < pool->heap_size && pool->heap[right]->priority > pool->heap[largest]->priority) {
        largest = right;
    }

    if (largest != index) {
        swap(&pool->heap[index], &pool->heap[largest]);
        heapify_down(pool, largest);
    }
}

// 插入任务到优先级队列
void enqueue(ThreadPool *pool, Task *task) {
    pthread_rwlock_wrlock(&pool->rwlock);
    if (pool->heap_size == pool->max_heap_size) {
        // 处理队列满的情况,如扩容
        pool->max_heap_size *= 2;
        pool->heap = realloc(pool->heap, pool->max_heap_size * sizeof(Task*));
    }
    pool->heap[pool->heap_size++] = task;
    heapify_up(pool, pool->heap_size - 1);
    pthread_rwlock_unlock(&pool->rwlock);
}

// 从优先级队列取出任务
Task* dequeue(ThreadPool *pool) {
    pthread_rwlock_wrlock(&pool->rwlock);
    if (pool->heap_size == 0) {
        pthread_rwlock_unlock(&pool->rwlock);
        return NULL;
    }
    Task *task = pool->heap[0];
    pool->heap[0] = pool->heap[--pool->heap_size];
    heapify_down(pool, 0);
    pthread_rwlock_unlock(&pool->rwlock);
    return task;
}

// 示例任务函数
void example_task(void *arg) {
    printf("Task with priority %d is running\n", *((int*)arg));
}

// 线程执行函数
void* thread_function(void *arg) {
    ThreadPool *pool = (ThreadPool*)arg;
    while (1) {
        Task *task = dequeue(pool);
        if (task == NULL) {
            // 可以根据情况处理任务队列为空的情况,如等待新任务
            sleep(1);
            continue;
        }
        task->func(task->arg);
        free(task);
    }
    return NULL;
}

int main() {
    ThreadPool pool;
    pool.heap_size = 0;
    pool.max_heap_size = 10;
    pool.heap = (Task**)malloc(pool.max_heap_size * sizeof(Task*));
    pthread_rwlock_init(&pool.rwlock, NULL);

    pthread_t threads[5];
    for (int i = 0; i < 5; i++) {
        pthread_create(&threads[i], NULL, thread_function, &pool);
    }

    for (int i = 0; i < 10; i++) {
        Task *task = (Task*)malloc(sizeof(Task));
        task->func = example_task;
        int priority = rand() % 100;
        task->arg = &priority;
        task->priority = priority;
        enqueue(&pool, task);
    }

    for (int i = 0; i < 5; i++) {
        pthread_join(threads[i], NULL);
    }

    pthread_rwlock_destroy(&pool.rwlock);
    free(pool.heap);
    return 0;
}

上述代码通过使用堆来优化任务优先级队列,并且使用读写锁来减少锁竞争,提高高并发情况下的性能。实际应用中,还需结合线程池的整体逻辑进行全面优化。