package main
import (
"fmt"
"sync"
)
// 生成整数序列的协程
func generateSequence(id int, out chan<- int) {
for i := id * 10; i < (id + 1) * 10; i++ {
out <- i
}
close(out)
}
// 扇入模式,合并多个整数序列
func fanIn(inputs []<-chan int, out chan<- int) {
var wg sync.WaitGroup
wg.Add(len(inputs))
for _, in := range inputs {
go func(c <-chan int) {
defer wg.Done()
for val := range c {
out <- val
}
}(in)
}
go func() {
wg.Wait()
close(out)
}()
}
// 平方计算的协程
func squareWorker(id int, in <-chan int, out chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for val := range in {
out <- val * val
}
}
// 扇出模式,将序列平均分配给处理协程
func fanOut(in <-chan int, numWorkers int, out []chan<- int) {
var wg sync.WaitGroup
wg.Add(numWorkers)
for i := 0; i < numWorkers; i++ {
go squareWorker(i, in, out[i], &wg)
}
go func() {
wg.Wait()
for _, o := range out {
close(o)
}
}()
}
func main() {
// 假设有3个协程生成序列
numGenerators := 3
inputChannels := make([]<-chan int, numGenerators)
for i := 0; i < numGenerators; i++ {
ch := make(chan int)
go generateSequence(i, ch)
inputChannels[i] = ch
}
// 合并所有序列
mergedChannel := make(chan int)
go fanIn(inputChannels, mergedChannel)
// 假设有2个处理协程进行平方计算
numWorkers := 2
outputChannels := make([]chan<- int, numWorkers)
for i := 0; i < numWorkers; i++ {
ch := make(chan int)
outputChannels[i] = ch
}
go fanOut(mergedChannel, numWorkers, outputChannels)
// 收集并打印结果
var finalResult []int
for _, out := range outputChannels {
for val := range out {
finalResult = append(finalResult, val)
}
}
fmt.Println(finalResult)
}