MST

星途 面试题库

面试题:Go池容量动态调整的策略与实现

给定一个场景,Go池当前正在处理大量的任务,随着业务量的波动,需要动态调整Go池的容量。请详细描述你会采用哪些策略来实现Go池容量的动态调整,并且使用Go代码示例展示如何实现这些策略。
24.0万 热度难度
编程语言Go

知识考点

AI 面试

面试题答案

一键面试

策略

  1. 基于任务队列长度:监控任务队列的长度,如果队列长度超过某个阈值,增加Go池容量;如果队列长度低于某个阈值,减少Go池容量。
  2. 基于系统资源:监控CPU、内存等系统资源的使用情况,当资源使用率较低时,增加Go池容量以充分利用资源;当资源使用率过高时,减少Go池容量以避免系统过载。
  3. 基于任务处理时间:记录每个任务的处理时间,如果平均处理时间较长,说明当前Go池容量可能不足,增加Go池容量;如果平均处理时间较短,说明当前Go池容量可能过大,减少Go池容量。

Go代码示例

package main

import (
    "fmt"
    "sync"
    "time"
)

// Task 定义任务结构
type Task struct {
    ID int
}

// TaskQueue 任务队列
type TaskQueue struct {
    tasks []Task
    mutex sync.Mutex
}

// AddTask 添加任务到队列
func (q *TaskQueue) AddTask(task Task) {
    q.mutex.Lock()
    q.tasks = append(q.tasks, task)
    q.mutex.Unlock()
}

// GetTask 从队列获取任务
func (q *TaskQueue) GetTask() (Task, bool) {
    q.mutex.Lock()
    defer q.mutex.Unlock()
    if len(q.tasks) == 0 {
        return Task{}, false
    }
    task := q.tasks[0]
    q.tasks = q.tasks[1:]
    return task, true
}

// WorkerPool 工作池
type WorkerPool struct {
    workers    int
    taskQueue  *TaskQueue
    wg         sync.WaitGroup
    stop       chan struct{}
    capacityCh chan int
}

// NewWorkerPool 创建新的工作池
func NewWorkerPool(workers int, taskQueue *TaskQueue) *WorkerPool {
    return &WorkerPool{
        workers:    workers,
        taskQueue:  taskQueue,
        stop:       make(chan struct{}),
        capacityCh: make(chan int),
    }
}

// Start 启动工作池
func (wp *WorkerPool) Start() {
    for i := 0; i < wp.workers; i++ {
        wp.wg.Add(1)
        go func() {
            defer wp.wg.Done()
            for {
                select {
                case <-wp.stop:
                    return
                default:
                    task, ok := wp.taskQueue.GetTask()
                    if!ok {
                        time.Sleep(100 * time.Millisecond)
                        continue
                    }
                    // 处理任务
                    fmt.Printf("Worker is processing task %d\n", task.ID)
                    time.Sleep(200 * time.Millisecond)
                }
            }
        }()
    }
    go func() {
        for {
            select {
            case newCapacity := <-wp.capacityCh:
                if newCapacity > wp.workers {
                    for i := wp.workers; i < newCapacity; i++ {
                        wp.wg.Add(1)
                        go func() {
                            defer wp.wg.Done()
                            for {
                                select {
                                case <-wp.stop:
                                    return
                                default:
                                    task, ok := wp.taskQueue.GetTask()
                                    if!ok {
                                        time.Sleep(100 * time.Millisecond)
                                        continue
                                    }
                                    // 处理任务
                                    fmt.Printf("New Worker is processing task %d\n", task.ID)
                                    time.Sleep(200 * time.Millisecond)
                                }
                            }
                        }()
                    }
                } else if newCapacity < wp.workers {
                    // 这里简单处理,直接停止多余的worker
                    for i := wp.workers - 1; i >= newCapacity; i-- {
                        wp.stop <- struct{}{}
                    }
                }
                wp.workers = newCapacity
            }
        }
    }()
}

// Stop 停止工作池
func (wp *WorkerPool) Stop() {
    close(wp.stop)
    wp.wg.Wait()
}

// AdjustCapacity 调整工作池容量
func (wp *WorkerPool) AdjustCapacity(capacity int) {
    wp.capacityCh <- capacity
}

使用示例

func main() {
    taskQueue := &TaskQueue{}
    workerPool := NewWorkerPool(3, taskQueue)
    workerPool.Start()

    // 添加任务
    for i := 1; i <= 10; i++ {
        taskQueue.AddTask(Task{ID: i})
    }

    // 动态调整容量
    go func() {
        time.Sleep(1 * time.Second)
        workerPool.AdjustCapacity(5)
        time.Sleep(2 * time.Second)
        workerPool.AdjustCapacity(2)
    }()

    time.Sleep(5 * time.Second)
    workerPool.Stop()
}

以上代码实现了一个简单的Go工作池,并展示了如何动态调整其容量。WorkerPool结构体包含工作池的相关信息,Start方法启动工作池,AdjustCapacity方法用于动态调整工作池的容量。在main函数中,我们先创建了一个初始容量为3的工作池,添加一些任务,然后通过AdjustCapacity方法动态调整工作池的容量。