MST

星途 面试题库

面试题:Go 固定 worker 工作池负载均衡的优化策略

现有一个基于 Go 的固定 worker 工作池用于处理大量网络请求任务,目前在高并发情况下出现部分 worker 负载过高,部分 worker 空闲的不均衡现象。请分析可能导致该问题的原因,并提出至少两种优化负载均衡的策略,同时阐述每种策略在 Go 中实现的关键要点。
31.0万 热度难度
编程语言Go

知识考点

AI 面试

面试题答案

一键面试

可能原因

  1. 任务分配不均:任务分发机制不合理,导致某些 worker 频繁接收任务,而其他 worker 闲置。
  2. 任务类型差异:不同类型的网络请求任务处理时间差异大,若分发时未考虑此因素,会造成负载不均衡。
  3. 资源竞争:worker 之间存在对共享资源(如数据库连接、文件句柄等)的竞争,部分 worker 因等待资源而空闲,部分 worker 持续处理任务。

优化策略及 Go 实现关键要点

  1. 随机任务分配策略
    • 实现要点:在任务分发时,使用 Go 标准库中的 math/rand 包生成随机数,根据随机数选择一个 worker 来处理任务。例如:
package main

import (
    "fmt"
    "math/rand"
    "time"
)

type Worker struct {
    id int
}

func (w *Worker) Process(task string) {
    fmt.Printf("Worker %d is processing task: %s\n", w.id, task)
}

func main() {
    rand.Seed(time.Now().UnixNano())
    var workers []*Worker
    for i := 0; i < 5; i++ {
        workers = append(workers, &Worker{id: i})
    }
    tasks := []string{"task1", "task2", "task3"}
    for _, task := range tasks {
        randomIndex := rand.Intn(len(workers))
        workers[randomIndex].Process(task)
    }
}
  1. 轮询任务分配策略
    • 实现要点:维护一个全局的索引变量,每次分发任务时,按顺序选择 worker 处理任务。当索引到达 worker 数量上限时,重置为 0。示例代码如下:
package main

import (
    "fmt"
)

type Worker struct {
    id int
}

func (w *Worker) Process(task string) {
    fmt.Printf("Worker %d is processing task: %s\n", w.id, task)
}

func main() {
    var workers []*Worker
    for i := 0; i < 5; i++ {
        workers = append(workers, &Worker{id: i})
    }
    tasks := []string{"task1", "task2", "task3"}
    index := 0
    for _, task := range tasks {
        workers[index].Process(task)
        index = (index + 1) % len(workers)
    }
}
  1. 基于任务优先级的分配策略
    • 实现要点:为任务定义优先级字段,在分发任务时,优先将高优先级任务分配给负载较低的 worker。可以通过维护一个 worker 负载状态的映射,以及一个优先级队列来实现。例如:
package main

import (
    "container/heap"
    "fmt"
)

type Task struct {
    name     string
    priority int
}

type PriorityQueue []Task

func (pq PriorityQueue) Len() int { return len(pq) }
func (pq PriorityQueue) Less(i, j int) bool {
    return pq[i].priority > pq[j].priority
}
func (pq PriorityQueue) Swap(i, j int) {
    pq[i], pq[j] = pq[j], pq[i]
}
func (pq *PriorityQueue) Push(x interface{}) {
    *pq = append(*pq, x.(Task))
}
func (pq *PriorityQueue) Pop() interface{} {
    old := *pq
    n := len(old)
    item := old[n - 1]
    *pq = old[0 : n - 1]
    return item
}

type Worker struct {
    id     int
    load   int
}

func main() {
    var workers []*Worker
    for i := 0; i < 5; i++ {
        workers = append(workers, &Worker{id: i, load: 0})
    }
    var pq PriorityQueue
    tasks := []Task{
        {"task1", 3},
        {"task2", 1},
        {"task3", 2},
    }
    for _, task := range tasks {
        heap.Init(&pq)
        for _, w := range workers {
            heap.Push(&pq, Task{fmt.Sprintf("worker %d", w.id), w.load})
        }
        minWorker := heap.Pop(&pq).(Task)
        for _, w := range workers {
            if fmt.Sprintf("worker %d", w.id) == minWorker.name {
                w.load++
                fmt.Printf("Worker %d is processing task: %s with priority %d\n", w.id, task.name, task.priority)
            }
        }
    }
}