MST

星途 面试题库

面试题:Go sync.Once 与复杂并发任务调度的融合

假设有一个分布式系统,其中的每个节点需要执行一系列复杂的初始化任务,这些任务之间存在依赖关系,且部分任务需要在并发环境下执行。请利用 Go 的 sync.Once 结合其他并发原语(如 channels、sync.WaitGroup 等)设计一个通用的初始化框架,确保任务按依赖顺序正确执行,且在整个分布式系统中每个任务只被初始化一次。阐述设计思路并给出核心代码实现。
46.7万 热度难度
编程语言Go

知识考点

AI 面试

面试题答案

一键面试

设计思路

  1. 任务依赖管理:使用一个数据结构(如 map)来表示任务之间的依赖关系。每个任务可以有零个或多个依赖任务。
  2. 任务执行状态跟踪:利用 sync.Once 确保每个任务在整个分布式系统中只被初始化一次。
  3. 并发执行:对于没有依赖关系的任务,可以并发执行,使用 sync.WaitGroup 来等待所有任务完成。channels 可以用于传递任务执行的结果或者控制信号。

核心代码实现

package main

import (
    "fmt"
    "sync"
)

// Task 定义任务类型
type Task func() error

// TaskGraph 定义任务图,包含任务及其依赖关系
type TaskGraph struct {
    tasks map[string]Task
    deps  map[string][]string
}

// NewTaskGraph 创建新的任务图
func NewTaskGraph() *TaskGraph {
    return &TaskGraph{
        tasks: make(map[string]Task),
        deps:  make(map[string][]string),
    }
}

// AddTask 添加任务及其依赖
func (tg *TaskGraph) AddTask(name string, task Task, depNames...string) {
    tg.tasks[name] = task
    tg.deps[name] = depNames
}

// Execute 执行任务图
func (tg *TaskGraph) Execute() error {
    var wg sync.WaitGroup
    var once sync.Once
    resultCh := make(chan error, len(tg.tasks))
    executed := make(map[string]bool)

    var executeTask func(string)

    executeTask = func(name string) {
        once.Do(func() {
            if executed[name] {
                return
            }
            deps := tg.deps[name]
            for _, dep := range deps {
                executeTask(dep)
            }
            wg.Add(1)
            go func() {
                defer wg.Done()
                task := tg.tasks[name]
                err := task()
                if err != nil {
                    resultCh <- err
                }
                executed[name] = true
            }()
        })
    }

    for name := range tg.tasks {
        executeTask(name)
    }

    go func() {
        wg.Wait()
        close(resultCh)
    }()

    for err := range resultCh {
        if err != nil {
            return err
        }
    }

    return nil
}

使用示例

func main() {
    tg := NewTaskGraph()
    tg.AddTask("task1", func() error {
        fmt.Println("Executing task1")
        return nil
    })
    tg.AddTask("task2", func() error {
        fmt.Println("Executing task2")
        return nil
    }, "task1")

    err := tg.Execute()
    if err != nil {
        fmt.Println("Error:", err)
    }
}