MST

星途 面试题库

面试题:Go固定worker工作池代码复用的架构扩展

假设你正在开发一个大型分布式系统,使用Go的固定worker工作池处理各种复杂业务逻辑。随着业务增长,需要对工作池的代码复用机制进行架构扩展,以支持动态添加和移除worker,同时保证不同类型任务处理逻辑的高效复用。请设计一个详细的架构方案,包括数据结构、接口定义、关键流程等,并阐述如何处理可能出现的并发问题。
40.4万 热度难度
编程语言Go

知识考点

AI 面试

面试题答案

一键面试

数据结构设计

  1. Worker结构体
    type Worker struct {
        ID       int
        TaskChan chan Task
        QuitChan chan struct{}
    }
    
    • ID 用于标识每个worker。
    • TaskChan 是用于接收任务的通道。
    • QuitChan 用于通知worker退出。
  2. Task结构体
    type Task struct {
        TaskType string
        Payload  interface{}
    }
    
    • TaskType 标识任务类型,用于区分不同业务逻辑。
    • Payload 是任务的具体数据。
  3. WorkPool结构体
    type WorkPool struct {
        Workers    map[int]*Worker
        TaskQueue  chan Task
        AddWorker  chan struct{}
        RemoveWorker chan int
    }
    
    • Workers 是一个映射,键为worker的ID,值为worker实例,用于管理所有的worker。
    • TaskQueue 是任务队列,用于存放待处理的任务。
    • AddWorker 通道用于接收添加worker的请求。
    • RemoveWorker 通道用于接收移除worker的请求,其值为要移除的worker的ID。

接口定义

  1. TaskHandler接口
    type TaskHandler interface {
        Handle(task Task)
    }
    
    • 定义了处理任务的方法,不同类型的任务处理逻辑通过实现这个接口来复用。

关键流程

  1. 初始化工作池
    func NewWorkPool(capacity int) *WorkPool {
        wp := &WorkPool{
            Workers:    make(map[int]*Worker),
            TaskQueue:  make(chan Task, capacity),
            AddWorker:  make(chan struct{}),
            RemoveWorker: make(chan int),
        }
        go wp.manageWorkers()
        return wp
    }
    
    • NewWorkPool 函数初始化工作池,并启动 manageWorkers 协程来管理worker的动态添加和移除。
  2. 添加Worker
    func (wp *WorkPool) AddWorker() {
        wp.AddWorker <- struct{}{}
    }
    
    • AddWorker 方法向 AddWorker 通道发送信号,请求添加worker。
    • manageWorkers 协程中:
    func (wp *WorkPool) manageWorkers() {
        workerID := 0
        for {
            select {
            case <-wp.AddWorker:
                w := &Worker{
                    ID:       workerID,
                    TaskChan: make(chan Task),
                    QuitChan: make(chan struct{}),
                }
                wp.Workers[workerID] = w
                go w.start()
                workerID++
            case id := <-wp.RemoveWorker:
                if w, ok := wp.Workers[id]; ok {
                    close(w.QuitChan)
                    delete(wp.Workers, id)
                }
            }
        }
    }
    
  3. 移除Worker
    func (wp *WorkPool) RemoveWorker(id int) {
        wp.RemoveWorker <- id
    }
    
    • RemoveWorker 方法向 RemoveWorker 通道发送要移除的worker的ID。
    • manageWorkers 协程中处理移除操作,关闭worker的 QuitChan 通道并从 Workers 映射中删除该worker。
  4. 提交任务
    func (wp *WorkPool) SubmitTask(task Task) {
        wp.TaskQueue <- task
    }
    
    • SubmitTask 方法将任务放入任务队列 TaskQueue
  5. Worker执行任务
    func (w *Worker) start() {
        for {
            select {
            case task, ok := <-w.TaskChan:
                if!ok {
                    return
                }
                // 根据TaskType找到对应的TaskHandler实现来处理任务
                handler := getTaskHandler(task.TaskType)
                if handler!= nil {
                    handler.Handle(task)
                }
            case <-w.QuitChan:
                return
            }
        }
    }
    
    • start 方法是每个worker的主循环,从 TaskChan 接收任务并根据任务类型找到对应的 TaskHandler 处理任务。当 QuitChan 接收到信号时,worker退出。

并发问题处理

  1. 资源竞争
    • 使用通道进行通信,避免对共享数据的直接竞争。例如,TaskQueueAddWorkerRemoveWorker 通道保证了任务提交、添加和移除worker操作的安全。
    • Workers 映射的操作在 manageWorkers 协程内串行化,避免多个协程同时修改导致的数据不一致。
  2. 任务分配
    • 采用无锁的通道 TaskQueue 作为任务队列,worker从该队列获取任务,保证任务分配的并发安全。
  3. 优雅退出
    • 每个worker通过 QuitChan 接收退出信号,在接收到信号后,安全地退出循环,避免资源泄漏。同时,manageWorkers 协程在移除worker时,先关闭 QuitChan 确保worker正常退出。