1. 扇入(Fan - In)
- 模式说明:将多个输入通道的数据合并到一个输出通道。常用于从多个数据源收集数据,然后统一处理。
- 示例代码:
package main
import (
"fmt"
)
func producer(id int, out chan int) {
for i := 0; i < 5; i++ {
out <- id*10 + i
}
close(out)
}
func fanIn(input1, input2 chan int, output chan int) {
for {
select {
case val, ok := <-input1:
if!ok {
input1 = nil
continue
}
output <- val
case val, ok := <-input2:
if!ok {
input2 = nil
continue
}
output <- val
}
if input1 == nil && input2 == nil {
close(output)
return
}
}
}
func main() {
ch1 := make(chan int)
ch2 := make(chan int)
result := make(chan int)
go producer(1, ch1)
go producer(2, ch2)
go fanIn(ch1, ch2, result)
for val := range result {
fmt.Println(val)
}
}
2. 扇出(Fan - Out)
- 模式说明:将一个输入通道的数据分发到多个输出通道,由多个协程并行处理。可以加快数据处理速度,充分利用多核CPU。
- 示例代码:
package main
import (
"fmt"
)
func fanOut(in chan int, out1, out2 chan int) {
for val := range in {
select {
case out1 <- val:
case out2 <- val:
}
}
close(out1)
close(out2)
}
func consumer(id int, in chan int) {
for val := range in {
fmt.Printf("Consumer %d received %d\n", id, val)
}
}
func main() {
input := make(chan int)
output1 := make(chan int)
output2 := make(chan int)
go fanOut(input, output1, output2)
go consumer(1, output1)
go consumer(2, output2)
for i := 0; i < 10; i++ {
input <- i
}
close(input)
select {}
}
3. 流水线(Pipeline)
- 模式说明:数据像在流水线上一样,依次经过多个处理阶段,每个阶段由一个或多个协程处理。每个阶段之间通过通道连接。
- 示例代码:
package main
import (
"fmt"
)
func stage1(in chan int, out chan int) {
for val := range in {
out <- val * 2
}
close(out)
}
func stage2(in chan int, out chan int) {
for val := range in {
out <- val + 3
}
close(out)
}
func main() {
start := make(chan int)
stage1Out := make(chan int)
final := make(chan int)
go stage1(start, stage1Out)
go stage2(stage1Out, final)
for i := 0; i < 5; i++ {
start <- i
}
close(start)
for result := range final {
fmt.Println(result)
}
}