package main
import (
"fmt"
"sync"
)
// 模拟数据源读取数据
func readData(dataChan chan int) {
for i := 1; i <= 10; i++ {
dataChan <- i
}
close(dataChan)
}
// 模拟数据处理阶段
func processData(dataChan <-chan int, processedChan chan int) {
for data := range dataChan {
processedData := data * 2
processedChan <- processedData
}
close(processedChan)
}
// 模拟数据写入存储阶段
func writeData(processedChan <-chan int, wg *sync.WaitGroup) {
defer wg.Done()
for data := range processedChan {
fmt.Println("Write to storage:", data)
}
}
func main() {
dataChan := make(chan int)
processedChan := make(chan int)
var wg sync.WaitGroup
wg.Add(1)
go readData(dataChan)
go processData(dataChan, processedChan)
go writeData(processedChan, &wg)
wg.Wait()
close(processedChan)
}
- 数据源读取阶段:
readData
函数向 dataChan
通道发送数据,完成后关闭通道。
- 数据处理阶段:
processData
函数从 dataChan
通道接收数据,进行处理(这里简单地乘以2),并将处理后的数据发送到 processedChan
通道,完成后关闭通道。
- 数据写入存储阶段:
writeData
函数从 processedChan
通道接收数据并模拟写入存储操作,使用 sync.WaitGroup
确保所有操作完成后程序退出。
- 主函数:创建两个通道
dataChan
和 processedChan
,启动三个 goroutine 分别执行上述三个阶段的任务,使用 sync.WaitGroup
同步,确保 writeData
完成后关闭 processedChan
通道。