面试题答案
一键面试背压处理方式
- 使用
buffer
操作符:buffer
操作符会创建一个缓冲区来存储上游发送的数据,当下游准备好时,再从缓冲区中取出数据。这样可以防止数据丢失,但是如果缓冲区填满,上游数据仍然会积压。 - 使用
onBackpressureBuffer
操作符:与buffer
类似,onBackpressureBuffer
专门用于处理背压,它会在下游处理不过来时将数据缓冲起来。可以设置缓冲区大小,当缓冲区满时,根据配置的策略进行处理,如丢弃最新数据或者抛出异常。 - 使用
onBackpressureDrop
操作符:当下游处理速度跟不上上游时,直接丢弃新的数据,以防止数据积压。 - 使用
onBackpressureLatest
操作符:只保留最新的数据,丢弃其他旧数据,当下游准备好时,处理最新的数据。
代码示例
import reactor.core.publisher.Flux
import reactor.core.scheduler.Schedulers
fun main() {
Flux.range(1, 1000)
.publishOn(Schedulers.parallel())
.log()
.onBackpressureBuffer(100) // 设置缓冲区大小为100
.subscribeOn(Schedulers.parallel())
.subscribe { value ->
println("Consumed: $value")
Thread.sleep(100) // 模拟下游消费速度慢
}
Thread.sleep(5000)
}
在上述示例中:
Flux.range(1, 1000)
创建一个从1到1000的数据流。publishOn(Schedulers.parallel())
将数据发布到并行调度器,模拟异步生产数据。log()
用于打印数据流的操作日志。onBackpressureBuffer(100)
设置缓冲区大小为100,处理背压。subscribeOn(Schedulers.parallel())
将订阅操作也放在并行调度器。subscribe
中的Thread.sleep(100)
模拟下游消费速度慢,通过设置缓冲区,防止数据丢失。