MST

星途 面试题库

面试题:Go语言WaitGroup在复杂并发场景下的应用

假设有一个分布式系统,其中有多个服务节点,每个节点需要执行一系列不同的子任务(如数据获取、数据处理、结果存储),并且这些子任务之间存在依赖关系。你需要使用WaitGroup来协调这些节点和子任务的并发执行,确保在所有相关任务完成后进行系统状态的更新。请描述设计思路,并给出关键部分的代码示例,同时说明如何处理可能出现的错误。
45.2万 热度难度
编程语言Go

知识考点

AI 面试

面试题答案

一键面试

设计思路

  1. 任务划分:将每个服务节点的一系列子任务明确划分,每个子任务定义为一个独立的函数,并且确定它们之间的依赖关系。
  2. WaitGroup 使用:为每个服务节点创建一个 WaitGroup 实例,用于等待该节点上所有子任务完成。当一个子任务启动时,调用 WaitGroupAdd(1) 方法;当子任务完成时,调用 Done() 方法。在每个节点的任务调度函数中,使用 Wait() 方法等待所有子任务完成。
  3. 错误处理:每个子任务在执行过程中如果发生错误,将错误返回。在主调度函数中收集这些错误,并在所有任务完成后统一处理。可以通过定义一个全局的错误通道或者在每个节点的任务列表中增加错误字段来收集错误。

关键部分代码示例

package main

import (
    "fmt"
    "sync"
)

// 模拟数据获取任务
func fetchData(wg *sync.WaitGroup, result *string, errChan chan error) {
    defer wg.Done()
    // 模拟数据获取操作
    data, err := getSomeData()
    if err != nil {
        errChan <- err
        return
    }
    *result = data
}

// 模拟数据处理任务
func processData(wg *sync.WaitGroup, input string, result *string, errChan chan error) {
    defer wg.Done()
    // 模拟数据处理操作
    processedData, err := process(input)
    if err != nil {
        errChan <- err
        return
    }
    *result = processedData
}

// 模拟结果存储任务
func storeResult(wg *sync.WaitGroup, data string, errChan chan error) {
    defer wg.Done()
    // 模拟结果存储操作
    err := store(data)
    if err != nil {
        errChan <- err
        return
    }
}

func main() {
    var wg sync.WaitGroup
    errChan := make(chan error)
    dataResult := ""
    processedResult := ""

    // 数据获取任务
    wg.Add(1)
    go fetchData(&wg, &dataResult, errChan)

    // 等待数据获取完成
    wg.Wait()
    if err := <-errChan; err != nil {
        fmt.Println("Data fetching error:", err)
        return
    }

    // 数据处理任务
    wg.Add(1)
    go processData(&wg, dataResult, &processedResult, errChan)

    // 等待数据处理完成
    wg.Wait()
    if err := <-errChan; err != nil {
        fmt.Println("Data processing error:", err)
        return
    }

    // 结果存储任务
    wg.Add(1)
    go storeResult(&wg, processedResult, errChan)

    // 等待所有任务完成
    wg.Wait()
    if err := <-errChan; err != nil {
        fmt.Println("Result storing error:", err)
        return
    }

    // 所有任务成功完成,更新系统状态
    fmt.Println("All tasks completed successfully, updating system state...")
}

// 模拟数据获取函数
func getSomeData() (string, error) {
    // 实际数据获取逻辑
    return "fetched data", nil
}

// 模拟数据处理函数
func process(input string) (string, error) {
    // 实际数据处理逻辑
    return "processed " + input, nil
}

// 模拟结果存储函数
func store(data string) error {
    // 实际结果存储逻辑
    fmt.Println("Storing data:", data)
    return nil
}

错误处理说明

  1. 子任务错误返回:每个子任务(如 fetchDataprocessDatastoreResult)在发生错误时,将错误发送到错误通道 errChan
  2. 主调度函数处理:在主调度函数中,每次等待某个子任务完成后,从错误通道中读取错误。如果有错误,打印错误信息并终止后续任务的执行。这样可以确保在任何一个子任务出错时,系统不会继续执行可能无效的后续任务,并且能够及时反馈错误信息。同时,错误通道可以确保在并发环境下,错误信息能够被正确收集和处理。