1. 背压策略之 BUFFER
- 应用场景:适用于生产者和消费者速度差异不是特别巨大,且有一定的内存可以用于缓冲数据的场景。在这种场景下,
BUFFER
策略可以通过缓冲一定数量的数据,避免生产者因消费者处理过慢而阻塞。
- 实现方式:在构建
Flow
时使用 flowOn
函数指定调度器,并设置 BufferOverflow
为 BUFFER
。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
val flow = flow {
for (i in 1..10) {
delay(100) // 模拟生产者生产数据较慢
emit(i)
}
}.flowOn(Dispatchers.Default, BufferOverflow.BUFFER)
flow.collect {
delay(200) // 模拟消费者处理数据更慢
println(it)
}
}
2. 背压策略之 DROP_OLDEST
- 应用场景:当生产者生成数据速度远快于消费者,且新数据比旧数据更重要,可丢弃旧数据以保证消费者能及时处理最新数据时使用。例如在实时数据更新场景中,新的状态比旧状态更有意义。
- 实现方式:同样在
flowOn
函数中指定调度器,并设置 BufferOverflow
为 DROP_OLDEST
。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
val flow = flow {
for (i in 1..10) {
delay(100)
emit(i)
}
}.flowOn(Dispatchers.Default, BufferOverflow.DROP_OLDEST)
flow.collect {
delay(500)
println(it)
}
}
3. 背压策略之 DROP_LATEST
- 应用场景:当消费者处理速度远远落后于生产者,且只关心最新数据,不希望处理中间积压的数据时使用。例如在股票价格实时更新,用户只关心最新股价的场景。
- 实现方式:在
flowOn
函数中设置 BufferOverflow
为 DROP_LATEST
。
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
val flow = flow {
for (i in 1..10) {
delay(100)
emit(i)
}
}.flowOn(Dispatchers.Default, BufferOverflow.DROP_LATEST)
flow.collect {
delay(500)
println(it)
}
}