设计思路
- 任务依赖管理:使用一个数据结构(如 map)来表示任务之间的依赖关系。每个任务可以有零个或多个依赖任务。
- 任务执行状态跟踪:利用
sync.Once
确保每个任务在整个分布式系统中只被初始化一次。
- 并发执行:对于没有依赖关系的任务,可以并发执行,使用
sync.WaitGroup
来等待所有任务完成。channels
可以用于传递任务执行的结果或者控制信号。
核心代码实现
package main
import (
"fmt"
"sync"
)
// Task 定义任务类型
type Task func() error
// TaskGraph 定义任务图,包含任务及其依赖关系
type TaskGraph struct {
tasks map[string]Task
deps map[string][]string
}
// NewTaskGraph 创建新的任务图
func NewTaskGraph() *TaskGraph {
return &TaskGraph{
tasks: make(map[string]Task),
deps: make(map[string][]string),
}
}
// AddTask 添加任务及其依赖
func (tg *TaskGraph) AddTask(name string, task Task, depNames...string) {
tg.tasks[name] = task
tg.deps[name] = depNames
}
// Execute 执行任务图
func (tg *TaskGraph) Execute() error {
var wg sync.WaitGroup
var once sync.Once
resultCh := make(chan error, len(tg.tasks))
executed := make(map[string]bool)
var executeTask func(string)
executeTask = func(name string) {
once.Do(func() {
if executed[name] {
return
}
deps := tg.deps[name]
for _, dep := range deps {
executeTask(dep)
}
wg.Add(1)
go func() {
defer wg.Done()
task := tg.tasks[name]
err := task()
if err != nil {
resultCh <- err
}
executed[name] = true
}()
})
}
for name := range tg.tasks {
executeTask(name)
}
go func() {
wg.Wait()
close(resultCh)
}()
for err := range resultCh {
if err != nil {
return err
}
}
return nil
}
使用示例
func main() {
tg := NewTaskGraph()
tg.AddTask("task1", func() error {
fmt.Println("Executing task1")
return nil
})
tg.AddTask("task2", func() error {
fmt.Println("Executing task2")
return nil
}, "task1")
err := tg.Execute()
if err != nil {
fmt.Println("Error:", err)
}
}