设计思路
- 任务结构体定义:定义不同类型任务的结构体,在结构体中添加优先级字段用于任务排序。
- 任务队列:使用一个优先级队列(可以借助
container/heap
包实现)来存储任务,这样能保证按照优先级顺序取出任务。
- 调度器:调度器从优先级队列中取出任务,通过
channel
将任务发送给工作协程执行。
- 工作协程:多个工作协程从
channel
接收任务并执行,执行完成后向另一个channel
发送执行结果(包括错误信息)。
- 错误处理:在调度器中监听结果
channel
,当接收到错误时,进行相应的错误处理,比如记录日志、重试任务等。
关键代码片段
package main
import (
"container/heap"
"fmt"
)
// 定义任务结构体
type Task struct {
ID int
Priority int
// 其他任务相关字段
}
// 定义优先级队列
type TaskQueue []*Task
func (pq TaskQueue) Len() int { return len(pq) }
func (pq TaskQueue) Less(i, j int) bool {
// 按照优先级从高到低排序
return pq[i].Priority > pq[j].Priority
}
func (pq TaskQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
}
func (pq *TaskQueue) Push(x interface{}) {
*pq = append(*pq, x.(*Task))
}
func (pq *TaskQueue) Pop() interface{} {
old := *pq
n := len(old)
item := old[n - 1]
*pq = old[0 : n - 1]
return item
}
func main() {
taskQueue := &TaskQueue{}
heap.Init(taskQueue)
// 向任务队列添加任务
task1 := &Task{ID: 1, Priority: 3}
task2 := &Task{ID: 2, Priority: 1}
task3 := &Task{ID: 3, Priority: 2}
heap.Push(taskQueue, task1)
heap.Push(taskQueue, task2)
heap.Push(taskQueue, task3)
// 定义任务执行通道和结果通道
taskCh := make(chan *Task)
resultCh := make(chan error)
// 启动工作协程
numWorkers := 3
for i := 0; i < numWorkers; i++ {
go worker(taskCh, resultCh)
}
// 调度任务
go func() {
for taskQueue.Len() > 0 {
task := heap.Pop(taskQueue).(*Task)
taskCh <- task
}
close(taskCh)
}()
// 处理任务执行结果
for i := 0; i < numWorkers; i++ {
err := <-resultCh
if err != nil {
fmt.Printf("Task execution error: %v\n", err)
}
}
close(resultCh)
}
func worker(taskCh <-chan *Task, resultCh chan<- error) {
for task := range taskCh {
// 模拟任务执行
err := executeTask(task)
resultCh <- err
}
}
func executeTask(task *Task) error {
// 实际任务执行逻辑,这里简单返回nil
return nil
}
错误处理
- 任务执行函数:在
executeTask
函数中,在任务执行失败时返回错误信息,如return fmt.Errorf("task %d failed", task.ID)
。
- 结果通道监听:在调度器(
main
函数中的for i := 0; i < numWorkers; i++
循环部分)通过resultCh
接收错误信息,根据具体业务需求进行处理,如记录日志、重试任务。
- 重试逻辑:如果需要重试任务,可以在接收到错误后,将任务重新添加到优先级队列
taskQueue
中,然后重新调度执行。例如:
for i := 0; i < numWorkers; i++ {
err := <-resultCh
if err != nil {
fmt.Printf("Task execution error: %v\n", err)
// 重新添加任务到队列
heap.Push(taskQueue, &Task{ID: 1, Priority: 3})
}
}