1. ReactiveException
- 出现情况:这是Reactor中所有反应式异常的基类,通常作为其他特定异常的父类,当遇到一般的反应式编程相关异常时可能抛出。
- 处理方式:在Reactor中,可以使用
onErrorResume
操作符来捕获并处理该异常及其子类异常。例如:
Mono.just("test")
.map(str -> {
if ("error".equals(str)) {
throw new ReactiveException("Simulated ReactiveException");
}
return str;
})
.onErrorResume(ReactiveException.class, error -> Mono.just("Error handled: " + error.getMessage()));
2. TimeoutException
- 出现情况:当在规定时间内没有接收到期望的信号(如
onNext
、onComplete
)时抛出。例如,在设置了timeout
操作符后,如果在指定时间内Publisher
没有发出信号就会抛出此异常。
- 处理方式:通过
onErrorResume
操作符捕获并处理,也可以使用retry
操作符在出现TimeoutException
时尝试重新订阅。示例如下:
Mono.delay(Duration.ofSeconds(5))
.timeout(Duration.ofSeconds(2))
.onErrorResume(TimeoutException.class, error -> Mono.just("Timeout handled: " + error.getMessage()))
.retryWhen(Retry.backoff(3, Duration.ofSeconds(1))
.filter(TimeoutException.class::isInstance));
3. EmptyPublisherException
- 出现情况:当
Publisher
没有发出任何元素就直接发出onComplete
信号时抛出,比如使用Mono.empty()
且后续代码期望有元素时。
- 处理方式:使用
onErrorResume
操作符处理,可提供默认值或执行其他逻辑。示例:
Mono.empty()
.onErrorResume(EmptyPublisherException.class, error -> Mono.just("Empty handled: " + error.getMessage()));
4. CancelException
- 出现情况:当订阅被取消时抛出,例如调用
Disposable
的dispose
方法取消订阅,或者在某些情况下,流的处理被提前终止时可能出现。
- 处理方式:同样可以用
onErrorResume
操作符捕获处理。如:
Flux.range(1, 10)
.doOnSubscribe(subscription -> subscription.cancel())
.onErrorResume(CancelException.class, error -> Mono.just("Cancel handled: " + error.getMessage()));