面试题答案
一键面试数据结构设计
- Worker结构体
type Worker struct { ID int TaskChan chan Task QuitChan chan struct{} }
ID
用于标识每个worker。TaskChan
是用于接收任务的通道。QuitChan
用于通知worker退出。
- Task结构体
type Task struct { TaskType string Payload interface{} }
TaskType
标识任务类型,用于区分不同业务逻辑。Payload
是任务的具体数据。
- WorkPool结构体
type WorkPool struct { Workers map[int]*Worker TaskQueue chan Task AddWorker chan struct{} RemoveWorker chan int }
Workers
是一个映射,键为worker的ID,值为worker实例,用于管理所有的worker。TaskQueue
是任务队列,用于存放待处理的任务。AddWorker
通道用于接收添加worker的请求。RemoveWorker
通道用于接收移除worker的请求,其值为要移除的worker的ID。
接口定义
- TaskHandler接口
type TaskHandler interface { Handle(task Task) }
- 定义了处理任务的方法,不同类型的任务处理逻辑通过实现这个接口来复用。
关键流程
- 初始化工作池
func NewWorkPool(capacity int) *WorkPool { wp := &WorkPool{ Workers: make(map[int]*Worker), TaskQueue: make(chan Task, capacity), AddWorker: make(chan struct{}), RemoveWorker: make(chan int), } go wp.manageWorkers() return wp }
NewWorkPool
函数初始化工作池,并启动manageWorkers
协程来管理worker的动态添加和移除。
- 添加Worker
func (wp *WorkPool) AddWorker() { wp.AddWorker <- struct{}{} }
AddWorker
方法向AddWorker
通道发送信号,请求添加worker。- 在
manageWorkers
协程中:
func (wp *WorkPool) manageWorkers() { workerID := 0 for { select { case <-wp.AddWorker: w := &Worker{ ID: workerID, TaskChan: make(chan Task), QuitChan: make(chan struct{}), } wp.Workers[workerID] = w go w.start() workerID++ case id := <-wp.RemoveWorker: if w, ok := wp.Workers[id]; ok { close(w.QuitChan) delete(wp.Workers, id) } } } }
- 移除Worker
func (wp *WorkPool) RemoveWorker(id int) { wp.RemoveWorker <- id }
RemoveWorker
方法向RemoveWorker
通道发送要移除的worker的ID。- 在
manageWorkers
协程中处理移除操作,关闭worker的QuitChan
通道并从Workers
映射中删除该worker。
- 提交任务
func (wp *WorkPool) SubmitTask(task Task) { wp.TaskQueue <- task }
SubmitTask
方法将任务放入任务队列TaskQueue
。
- Worker执行任务
func (w *Worker) start() { for { select { case task, ok := <-w.TaskChan: if!ok { return } // 根据TaskType找到对应的TaskHandler实现来处理任务 handler := getTaskHandler(task.TaskType) if handler!= nil { handler.Handle(task) } case <-w.QuitChan: return } } }
start
方法是每个worker的主循环,从TaskChan
接收任务并根据任务类型找到对应的TaskHandler
处理任务。当QuitChan
接收到信号时,worker退出。
并发问题处理
- 资源竞争
- 使用通道进行通信,避免对共享数据的直接竞争。例如,
TaskQueue
、AddWorker
和RemoveWorker
通道保证了任务提交、添加和移除worker操作的安全。 - 对
Workers
映射的操作在manageWorkers
协程内串行化,避免多个协程同时修改导致的数据不一致。
- 使用通道进行通信,避免对共享数据的直接竞争。例如,
- 任务分配
- 采用无锁的通道
TaskQueue
作为任务队列,worker从该队列获取任务,保证任务分配的并发安全。
- 采用无锁的通道
- 优雅退出
- 每个worker通过
QuitChan
接收退出信号,在接收到信号后,安全地退出循环,避免资源泄漏。同时,manageWorkers
协程在移除worker时,先关闭QuitChan
确保worker正常退出。
- 每个worker通过