面试题答案
一键面试扇出(Fan - Out)阶段
- 作用:
- 将输入的文件任务分发给多个Goroutine并行处理。这样可以利用多核CPU的优势,大大提高处理效率。例如,假设有100个文件需要处理,扇出阶段可以将这100个文件分配给10个Goroutine,每个Goroutine处理10个文件。
- 实现方式:
- 可以使用一个通道(
chan
)来传递需要处理的文件路径或文件对象。然后启动多个Goroutine,每个Goroutine从这个通道中读取文件任务进行处理。 - 示例代码如下:
- 可以使用一个通道(
package main
import (
"fmt"
)
func fanOut(filePaths <-chan string, numWorkers int, resultChan chan<- string) {
for i := 0; i < numWorkers; i++ {
go func() {
for filePath := range filePaths {
// 处理文件逻辑,这里简单返回文件路径作为结果
result := processFile(filePath)
resultChan <- result
}
}()
}
}
func processFile(filePath string) string {
// 实际的复杂文件处理逻辑
return fmt.Sprintf("Processed %s", filePath)
}
扇入(Fan - In)阶段
- 作用:
- 收集扇出阶段各个Goroutine处理后的结果,并进行汇总。例如,将10个Goroutine处理文件后的结果收集起来,再进行统一的后续处理(如写入日志、返回给调用者等)。
- 实现方式:
- 通常使用一个或多个通道来接收扇出阶段各个Goroutine发送的处理结果。可以使用
sync.WaitGroup
来等待所有扇出的Goroutine完成任务。 - 示例代码如下:
- 通常使用一个或多个通道来接收扇出阶段各个Goroutine发送的处理结果。可以使用
func fanIn(resultChan <-chan string, numWorkers int, finalResultChan chan<- []string) {
var wg sync.WaitGroup
wg.Add(numWorkers)
var results []string
for i := 0; i < numWorkers; i++ {
go func() {
defer wg.Done()
for result := range resultChan {
results = append(results, result)
}
}()
}
go func() {
wg.Wait()
close(finalResultChan)
finalResultChan <- results
}()
}
错误处理
- 扇出阶段错误处理:
- 在处理文件的函数(如
processFile
)中,可以返回错误信息。然后在Goroutine内部,将错误信息发送到一个专门的错误通道。 - 示例修改
processFile
函数和fanOut
函数如下:
- 在处理文件的函数(如
func processFile(filePath string) (string, error) {
// 实际的复杂文件处理逻辑,这里简单返回文件路径作为结果
if filePath == "" {
return "", fmt.Errorf("invalid file path")
}
return fmt.Sprintf("Processed %s", filePath), nil
}
func fanOut(filePaths <-chan string, numWorkers int, resultChan chan<- string, errorChan chan<- error) {
for i := 0; i < numWorkers; i++ {
go func() {
for filePath := range filePaths {
result, err := processFile(filePath)
if err != nil {
errorChan <- err
} else {
resultChan <- result
}
}
}()
}
}
- 扇入阶段错误处理:
- 在扇入阶段,可以从错误通道接收错误信息,并进行相应处理。例如,可以记录错误日志,或者根据错误情况决定是否继续处理后续结果。
- 示例修改
fanIn
函数如下:
func fanIn(resultChan <-chan string, errorChan <-chan error, numWorkers int, finalResultChan chan<- []string) {
var wg sync.WaitGroup
wg.Add(numWorkers)
var results []string
go func() {
for err := range errorChan {
fmt.Println("Error:", err)
}
}()
for i := 0; i < numWorkers; i++ {
go func() {
defer wg.Done()
for result := range resultChan {
results = append(results, result)
}
}()
}
go func() {
wg.Wait()
close(finalResultChan)
finalResultChan <- results
}()
}
在主函数中,可以这样调用:
func main() {
filePaths := make(chan string)
resultChan := make(chan string)
errorChan := make(chan error)
finalResultChan := make(chan []string)
numWorkers := 3
// 模拟传入文件路径
go func() {
filePaths <- "file1.txt"
filePaths <- "file2.txt"
filePaths <- "file3.txt"
close(filePaths)
}()
fanOut(filePaths, numWorkers, resultChan, errorChan)
fanIn(resultChan, errorChan, numWorkers, finalResultChan)
for result := range finalResultChan {
fmt.Println("Final Results:", result)
}
close(resultChan)
close(errorChan)
}
以上代码展示了如何使用Goroutine的扇出扇入模式处理大量文件,并处理可能出现的错误。实际应用中,processFile
函数应替换为真实的复杂文件处理逻辑。