面试题答案
一键面试背压产生的场景
在异步数据流处理中,当生产者(发射数据的一方)产生数据的速度远远快于消费者(收集数据的一方)处理数据的速度时,就会产生背压。例如在Flow中,一个Flow可能会快速发射大量数据,而下游的collect操作由于某些原因(如复杂的业务逻辑处理、I/O操作等)无法及时处理这些数据,这时就会出现背压问题。如果不处理背压,数据可能会在缓冲区堆积,最终导致内存溢出。
Flow提供的处理背压策略
- BUFFER:这是默认策略。它会使用一个缓冲区来暂存生产者发射的数据。缓冲区默认大小为64,如果缓冲区满了,生产者会暂停发射数据,直到消费者从缓冲区中取出数据,腾出空间。
- DROP:在这种策略下,当缓冲区满了,生产者继续发射数据时,新的数据会直接被丢弃,不会进入缓冲区。这意味着部分数据会丢失,但可以避免缓冲区无限增长导致内存溢出。
- LATEST:此策略下,缓冲区始终只保留最新发射的数据。当缓冲区满了,新的数据会替换掉缓冲区中已有的数据。消费者始终收集到的是最新的数据,同样会丢弃旧数据。
示例代码
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
fun main() = runBlocking {
val flow = flow {
var i = 0
while (true) {
emit(i++)
delay(10) // 模拟快速发射数据
}
}.flowOn(flowOn = kotlinx.coroutines.Dispatchers.Default)
flow
.onBackpressureDrop { dropped ->
println("Dropped $dropped")
}
.collect { value ->
delay(100) // 模拟慢速收集数据
println("Collected $value")
}
}
在上述代码中,flow
以每10毫秒的频率发射数据,而 collect
每100毫秒才处理一个数据,模拟了高发射频率和低收集频率的场景。通过 onBackpressureDrop
策略,当缓冲区满了新数据会被丢弃,并打印出被丢弃的数据,以此来处理背压,避免内存溢出。如果想使用其他策略,只需将 onBackpressureDrop
替换为 onBackpressureBuffer
(BUFFER策略) 或 onBackpressureLatest
(LATEST策略) 即可。