策略设计
- 任务队列:使用一个共享的任务队列,将所有待处理任务放入其中。
- 工作池:创建固定数量的worker,每个worker从任务队列中不断取出任务进行处理。
- 调度机制:采用公平调度算法,例如轮询调度,确保每个worker有均等机会获取任务。
代码实现思路
- 初始化任务队列:使用Go的
channel
来实现任务队列,channel
本身是线程安全的。
- 启动worker:创建固定数量的goroutine作为worker,每个goroutine从任务队列中获取任务并处理。
- 任务分发:将任务发送到任务队列中,由worker自动获取处理。
关键代码片段
package main
import (
"fmt"
)
// 定义任务结构体
type Task struct {
ID int
// 其他任务相关数据
}
func worker(id int, taskQueue <-chan Task) {
for task := range taskQueue {
fmt.Printf("Worker %d is processing task %d\n", id, task.ID)
// 模拟任务处理
// 实际应用中替换为真实任务处理逻辑
}
fmt.Printf("Worker %d stopped\n", id)
}
func main() {
const numWorkers = 3
const numTasks = 10
taskQueue := make(chan Task, numTasks)
// 启动worker
for i := 0; i < numWorkers; i++ {
go worker(i, taskQueue)
}
// 放入任务
for i := 0; i < numTasks; i++ {
taskQueue <- Task{ID: i}
}
close(taskQueue)
// 等待所有worker完成任务,可使用sync.WaitGroup完善此部分
select {}
}