实现思路
- 创建Channel:使用
Channel
来作为生产者和消费者之间通信的桥梁。根据需求,选择合适的容量,例如Channel.UNLIMITED
表示无界队列,Channel.CONFLATED
表示只保留最新值等。
- 生产者协程:每个生产者协程不断生成数据,并通过
send
方法将数据发送到Channel
中。
- 消费者协程:每个消费者协程通过
receive
方法从Channel
中接收数据并处理。由于Channel
本身保证了数据的顺序性,所以无需额外处理数据顺序问题。
- 资源合理利用:可以使用
CoroutineScope
来管理协程的生命周期,确保在不需要时能够正确取消协程,释放资源。
核心代码片段
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
fun main() = runBlocking<Unit> {
// 创建一个Channel
val channel = Channel<Int>()
// 生产者协程
val producer1 = launch {
for (i in 1..5) {
channel.send(i)
delay(100) // 模拟生产数据的耗时
}
}
val producer2 = launch {
for (i in 6..10) {
channel.send(i)
delay(150) // 模拟生产数据的耗时
}
}
// 消费者协程
val consumer1 = launch {
for (i in 1..10) {
val data = channel.receive()
println("Consumer1 received: $data")
delay(200) // 模拟处理数据的耗时
}
}
val consumer2 = launch {
for (i in 1..10) {
val data = channel.receive()
println("Consumer2 received: $data")
delay(250) // 模拟处理数据的耗时
}
}
// 等待所有生产者和消费者完成
producer1.join()
producer2.join()
consumer1.join()
consumer2.join()
}