MST

星途 面试题库

面试题:Go语言Goroutine扇出扇入模式的应用

假设你有一个需要处理大量文件的任务,每个文件处理逻辑较为复杂。请使用Goroutine的扇出扇入模式设计并实现一个解决方案,以提高处理效率。说明扇出和扇入阶段分别做了什么,以及如何处理可能出现的错误。
33.2万 热度难度
编程语言Go

知识考点

AI 面试

面试题答案

一键面试

扇出(Fan - Out)阶段

  1. 作用
    • 将输入的文件任务分发给多个Goroutine并行处理。这样可以利用多核CPU的优势,大大提高处理效率。例如,假设有100个文件需要处理,扇出阶段可以将这100个文件分配给10个Goroutine,每个Goroutine处理10个文件。
  2. 实现方式
    • 可以使用一个通道(chan)来传递需要处理的文件路径或文件对象。然后启动多个Goroutine,每个Goroutine从这个通道中读取文件任务进行处理。
    • 示例代码如下:
package main

import (
    "fmt"
)

func fanOut(filePaths <-chan string, numWorkers int, resultChan chan<- string) {
    for i := 0; i < numWorkers; i++ {
        go func() {
            for filePath := range filePaths {
                // 处理文件逻辑,这里简单返回文件路径作为结果
                result := processFile(filePath)
                resultChan <- result
            }
        }()
    }
}

func processFile(filePath string) string {
    // 实际的复杂文件处理逻辑
    return fmt.Sprintf("Processed %s", filePath)
}

扇入(Fan - In)阶段

  1. 作用
    • 收集扇出阶段各个Goroutine处理后的结果,并进行汇总。例如,将10个Goroutine处理文件后的结果收集起来,再进行统一的后续处理(如写入日志、返回给调用者等)。
  2. 实现方式
    • 通常使用一个或多个通道来接收扇出阶段各个Goroutine发送的处理结果。可以使用sync.WaitGroup来等待所有扇出的Goroutine完成任务。
    • 示例代码如下:
func fanIn(resultChan <-chan string, numWorkers int, finalResultChan chan<- []string) {
    var wg sync.WaitGroup
    wg.Add(numWorkers)
    var results []string
    for i := 0; i < numWorkers; i++ {
        go func() {
            defer wg.Done()
            for result := range resultChan {
                results = append(results, result)
            }
        }()
    }
    go func() {
        wg.Wait()
        close(finalResultChan)
        finalResultChan <- results
    }()
}

错误处理

  1. 扇出阶段错误处理
    • 在处理文件的函数(如processFile)中,可以返回错误信息。然后在Goroutine内部,将错误信息发送到一个专门的错误通道。
    • 示例修改processFile函数和fanOut函数如下:
func processFile(filePath string) (string, error) {
    // 实际的复杂文件处理逻辑,这里简单返回文件路径作为结果
    if filePath == "" {
        return "", fmt.Errorf("invalid file path")
    }
    return fmt.Sprintf("Processed %s", filePath), nil
}

func fanOut(filePaths <-chan string, numWorkers int, resultChan chan<- string, errorChan chan<- error) {
    for i := 0; i < numWorkers; i++ {
        go func() {
            for filePath := range filePaths {
                result, err := processFile(filePath)
                if err != nil {
                    errorChan <- err
                } else {
                    resultChan <- result
                }
            }
        }()
    }
}
  1. 扇入阶段错误处理
    • 在扇入阶段,可以从错误通道接收错误信息,并进行相应处理。例如,可以记录错误日志,或者根据错误情况决定是否继续处理后续结果。
    • 示例修改fanIn函数如下:
func fanIn(resultChan <-chan string, errorChan <-chan error, numWorkers int, finalResultChan chan<- []string) {
    var wg sync.WaitGroup
    wg.Add(numWorkers)
    var results []string
    go func() {
        for err := range errorChan {
            fmt.Println("Error:", err)
        }
    }()
    for i := 0; i < numWorkers; i++ {
        go func() {
            defer wg.Done()
            for result := range resultChan {
                results = append(results, result)
            }
        }()
    }
    go func() {
        wg.Wait()
        close(finalResultChan)
        finalResultChan <- results
    }()
}

在主函数中,可以这样调用:

func main() {
    filePaths := make(chan string)
    resultChan := make(chan string)
    errorChan := make(chan error)
    finalResultChan := make(chan []string)
    numWorkers := 3
    // 模拟传入文件路径
    go func() {
        filePaths <- "file1.txt"
        filePaths <- "file2.txt"
        filePaths <- "file3.txt"
        close(filePaths)
    }()
    fanOut(filePaths, numWorkers, resultChan, errorChan)
    fanIn(resultChan, errorChan, numWorkers, finalResultChan)
    for result := range finalResultChan {
        fmt.Println("Final Results:", result)
    }
    close(resultChan)
    close(errorChan)
}

以上代码展示了如何使用Goroutine的扇出扇入模式处理大量文件,并处理可能出现的错误。实际应用中,processFile函数应替换为真实的复杂文件处理逻辑。