面试题答案
一键面试背压产生的原因
- 上下游流速不匹配:在Reactive编程中,数据是异步流动的。上游可能以非常快的速度产生数据,而下游由于各种原因(如复杂的计算、I/O操作等)无法以同样的速度处理数据。例如,一个传感器可能每秒产生数千个数据点,而处理这些数据的模块可能每秒只能处理几百个。
- 缓冲区限制:如果没有足够的缓冲区来暂存数据,当上游产生数据的速度超过下游处理速度时,就会导致数据积压,进而产生背压。
背压带来的问题
- 内存溢出:如果背压没有得到妥善处理,数据不断积压在缓冲区中,随着时间的推移,缓冲区会不断膨胀,最终可能耗尽内存,导致内存溢出错误(OutOfMemoryError)。
- 数据丢失:在某些情况下,如果缓冲区已满且没有有效的背压策略,新的数据可能会覆盖旧的数据,从而导致数据丢失,影响系统的准确性和完整性。
- 性能下降:过多的数据积压会导致系统响应变慢,因为处理线程可能会花费大量时间在处理积压的数据上,而无法及时处理新的任务,降低了整个系统的性能。
RxJava中处理背压的方式
- Flowable:RxJava 2引入了
Flowable
类专门用于处理背压。Flowable
使用Publisher
-Subscriber
模型,与传统的Observable
-Observer
模型类似,但增加了背压支持。- 请求机制:
Subscriber
通过onSubscribe
方法中的Subscription
对象向Publisher
请求数据。Subscription
的request(long n)
方法用于告诉Publisher
它准备好接收n
个数据项。Publisher
会根据这个请求量来发送数据。 - 背压策略:
- BUFFER:
Flowable.create(Callable<Publisher<T>> supplier, BackpressureStrategy.BUFFER)
这种策略会尽可能多地缓存数据,直到内存耗尽。适用于短时间内数据量可预测且不会导致内存问题的场景。 - DROP:
Flowable.create(Callable<Publisher<T>> supplier, BackpressureStrategy.DROP)
如果下游处理速度慢,新数据到达时,如果缓冲区已满,新数据将直接被丢弃。 - LATEST:
Flowable.create(Callable<Publisher<T>> supplier, BackpressureStrategy.LATEST)
只保留最新的数据,当缓冲区满时,丢弃旧数据,只保留最新的一条。适用于只关心最新数据的场景,如实时监控数据。 - ERROR:
Flowable.create(Callable<Publisher<T>> supplier, BackpressureStrategy.ERROR)
如果缓冲区满且有新数据到来,立即抛出MissingBackpressureException
异常,提醒开发者处理背压问题。
- BUFFER:
- 请求机制:
- 操作符处理背压:
subscribeOn(Scheduler scheduler)
:可以指定Publisher
在哪个调度器上运行,从而控制数据产生的速度。例如,将数据产生放在一个较慢的调度器上,使其与下游处理速度相匹配。observeOn(Scheduler scheduler)
:指定Subscriber
在哪个调度器上执行,确保下游处理逻辑在合适的线程环境中运行,避免因线程阻塞等问题导致背压。buffer(int count)
:可以将数据按指定数量进行缓冲,下游每次请求时获取一个缓冲区的数据,从而控制数据流量。例如,Flowable.buffer(10)
表示每10个数据组成一个缓冲区发送给下游。