package main
import (
"context"
"fmt"
"sync"
"time"
)
func sendData(ctx context.Context, wg *sync.WaitGroup, dataCh chan<- int, id int) {
defer wg.Done()
for i := 0; i < 5; i++ {
select {
case <-ctx.Done():
return
case dataCh <- id*10 + i:
fmt.Printf("Sender %d sent %d\n", id, id*10 + i)
time.Sleep(time.Millisecond * 200)
case <-time.After(time.Second):
fmt.Printf("Sender %d timeout\n", id)
}
}
close(dataCh)
}
func receiveData(ctx context.Context, wg *sync.WaitGroup, dataCh <-chan int, id int) {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case data, ok := <-dataCh:
if!ok {
fmt.Printf("Receiver %d: channel closed\n", id)
return
}
fmt.Printf("Receiver %d received %d\n", id, data)
case <-time.After(time.Second):
fmt.Printf("Receiver %d timeout\n", id)
}
}
}
func main() {
var wg sync.WaitGroup
dataCh := make(chan int)
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
numSenders := 3
numReceivers := 2
for i := 0; i < numSenders; i++ {
wg.Add(1)
go sendData(ctx, &wg, dataCh, i)
}
for i := 0; i < numReceivers; i++ {
wg.Add(1)
go receiveData(ctx, &wg, dataCh, i)
}
wg.Wait()
close(dataCh)
}
- 发送数据:
sendData
函数负责向管道 dataCh
发送数据。
- 使用
context.Context
来处理超时或取消操作。如果 ctx.Done()
被触发,函数会停止发送数据。
- 使用
select
语句结合 time.After
来处理发送数据的超时情况。
- 发送完成后关闭管道。
- 接收数据:
receiveData
函数负责从管道 dataCh
接收数据。
- 同样使用
context.Context
来处理超时或取消操作。
- 使用
select
语句结合 time.After
来处理接收数据的超时情况。
- 通过
ok
检查管道是否关闭,当管道关闭时,函数停止接收数据。
- 主函数:
- 创建一个带有超时的
context.Context
。
- 创建一个数据管道
dataCh
。
- 启动多个发送者和接收者 goroutine,并使用
sync.WaitGroup
等待所有 goroutine 完成。
- 主函数结束前关闭管道。