设计思路
- 第一阶段:
- 使用
WaitGroup
来等待第一阶段所有 goroutine
完成数据读取和处理。每个 goroutine
负责从一个数据源读取数据并处理,处理结果暂存。
- 遍历数据源列表,为每个数据源启动一个
goroutine
,并在启动 goroutine
前调用 wg.Add(1)
,在 goroutine
结束时调用 wg.Done()
。
- 第二阶段:
- 第一阶段所有
goroutine
完成后(通过 wg.Wait()
等待),获取第一阶段的处理结果。
- 再次使用
WaitGroup
来等待第二阶段所有 goroutine
完成对第一阶段结果的处理。为每个第一阶段的结果启动一个 goroutine
进行第二阶段处理,同样在启动 goroutine
前调用 wg.Add(1)
,在 goroutine
结束时调用 wg.Done()
。
关键代码
package main
import (
"fmt"
"sync"
)
// 模拟从数据源读取数据并处理的函数
func processDataFromSource(source int, wg *sync.WaitGroup, resultChan chan int) {
defer wg.Done()
// 模拟数据处理
data := source * 2
resultChan <- data
}
// 模拟对第一阶段结果进行处理的函数
func processResult(result int, wg *sync.WaitGroup) {
defer wg.Done()
// 模拟第二阶段数据处理
processedResult := result + 1
fmt.Printf("Processed result: %d\n", processedResult)
}
func main() {
var wg1, wg2 sync.WaitGroup
resultChan := make(chan int)
sources := []int{1, 2, 3, 4}
// 第一阶段
for _, source := range sources {
wg1.Add(1)
go processDataFromSource(source, &wg1, resultChan)
}
go func() {
wg1.Wait()
close(resultChan)
}()
// 第二阶段
for result := range resultChan {
wg2.Add(1)
go processResult(result, &wg2)
}
wg2.Wait()
}