设计思路
- 异常处理:在生产者中捕获资源耗尽异常,一旦捕获到异常,停止向共享缓冲通道发送数据。
- 信号传递:通过一个单独的
done
通道来通知所有消费者,某个生产者出现异常,并且所有生产者都已完成数据发送。
- 资源协调:每个生产者完成工作后,向一个
producerDone
通道发送信号,当所有生产者都发送了信号,且某个生产者出现异常时,关闭done
通道,告知消费者可以处理剩余数据并退出。
关键代码片段(以Go语言为例)
package main
import (
"fmt"
"sync"
)
func main() {
const bufferSize = 10
sharedBuffer := make(chan int, bufferSize)
producerDone := make(chan struct{})
done := make(chan struct{})
var wg sync.WaitGroup
// 生产者1
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 10; i++ {
if i == 5 {
// 模拟资源耗尽异常
fmt.Println("Producer 1: Resource exhausted")
close(producerDone)
return
}
sharedBuffer <- i
}
producerDone <- struct{}{}
}()
// 生产者2
wg.Add(1)
go func() {
defer wg.Done()
for i := 10; i < 20; i++ {
sharedBuffer <- i
}
producerDone <- struct{}{}
}()
// 消费者
for i := 0; i < 2; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case data, ok := <-sharedBuffer:
if!ok {
return
}
fmt.Printf("Consumer received: %d\n", data)
case <-done:
for data := range sharedBuffer {
fmt.Printf("Consumer handling remaining: %d\n", data)
}
return
}
}
}()
}
go func() {
wg.Wait()
close(producerDone)
}()
go func() {
numProducers := 2
for i := 0; i < numProducers; i++ {
<-producerDone
}
close(done)
close(sharedBuffer)
}()
select {}
}
- 生产者部分:生产者在出现异常(这里模拟为
i == 5
)时,关闭producerDone
通道并返回。正常完成工作时,向producerDone
通道发送信号。
- 消费者部分:消费者通过
select
语句监听sharedBuffer
和done
通道。当done
通道接收到信号时,处理sharedBuffer
中的剩余数据,然后退出。
- 资源协调部分:使用
wg
等待组等待所有生产者完成工作,当所有生产者都完成(包括出现异常的情况),关闭done
通道和sharedBuffer
通道,使消费者可以正确处理剩余数据并优雅退出。