初识呼应式编程的时分,除了从指令式的思想办法转变为函数式的编程办法外,其间有一个很大的不适应的地方便是在面对反常时该怎样处理,尤其是面对查看反常(Checked Exception)时更是手足无措。在遇到反常时,咱们通用的处理办法便是打日志、降级兜底、重试三板斧,本文经过Project Reactor的文档以及源码来深入解读,在reactor中是怎样高雅地完成这反常处理三板斧。
在介绍怎样运用前,咱们先回忆下在用reactor编程的时分,遇到的几个问题:
- 遇到反常时,假如能处理,我该怎样兜底/降级
- 遇到无法处理的反常时,我该怎样打印日志,并往外抛
- 遇到声明晰查看反常的办法时,该怎样处理
- 假如调用失利了(如请求超时),该怎样重试
- 假如出现反常了,流里边的后续数据还会持续发送吗
反常处理的底层机制
在答复这些问题,就需求咱们首要对reactor处理反常的机制要有理解。先说定论,如文档上说的:
Before you learn about error-handling operators, you must keep in mind that any error in a reactive sequence is a terminal event. Even if an error-handling operator is used, it does not let the original sequence continue. Rather, it converts the onError signal into the start of a new sequence (the fallback one). In other words, it replaces the terminated sequence upstream of it.
即,一旦出现了反常,那原先的数据流就会直接完毕了,是没有办法再恢复的。所以假如要降级兜底,那只能再替换一个新的流。或许重试,但其实也是相当于创建了新流,仅仅数据和原先的相同。
那为什么原先的流就完毕了呢? 或许说怎样就完毕了呢?
先抛开反常处理的论题,咱们回到最根底的层面,假如要自动完毕一个流,该怎样完毕呢?比方一个网络连接,假如出现反常了,流完毕了,该怎样开释资源呢?
咱们知道,每次订阅调用的时分,都会回来一个Disposable目标,如Disposable disposable = Flux.just(1,2,3).subscribe()
。所以,假如要自动完毕一个流,其实便是调用Disposable目标的dispose办法。再深入下去,就会发现,其dispose办法内部其实调用的是由publisher发生的subscription的cancel办法。只需调用cancel办法,才能完美的完毕publisher并开释资源。所以,要想完毕一个流,只需调用subscription的cancel办法。
所以,当出现反常时,原先的流会完毕的原因,其实便是调用了subscription的cancel办法了。那是何时调用的呢,咱们以FluxMap为例,看下源码。
public void onNext(T t) {
if (done) {
Operators.onNextDropped(t, actual.currentContext());
return;
}
R v;
try {
v = Objects.requireNonNull(mapper.apply(t),
"The mapper returned a null value.");
}
catch (Throwable e) {
Throwable e_ = Operators.onNextError(t, e, actual.currentContext(), s);
if (e_ != null) {
onError(e_);
}
else {
s.request(1);
}
return;
}
actual.onNext(v);
}
能够看到,在数据处理onNext()的办法内部,一般都有相似的try-catch结构。当出现反常时,会先对反常进行处理,确认是否需求处理反常以及怎样处理,其逻辑都在Operators.onNextError这个办法里。若要处理反常时,则会进入onError的流程里。
下面重点来看看Operators.onNextError这个办法,它首要首要包含了两件事:
- 这个反常要不要吃掉,当做非反常处理:详细能够拜见Operators.onNextErrorStrategy办法
- 这个反常要不要往下传,即假如是严重反常时,则直接抛出;不然,对订阅进行cancel。详见Operators.onOperatorError办法.
削减篇幅起见,这里只贴下Operators.onOperatorError的办法,其实onNextErrorStrategy办法也很重要,它会从context里拿出对指定反常能处理的strategy来进行处理,经过传入的value以及发生的error决议是否要抛出这个反常。
public static Throwable onOperatorError(@Nullable Subscription subscription,
Throwable error,
@Nullable Object dataSignal, Context context) {
Exceptions.throwIfFatal(error);
if(subscription != null) {
subscription.cancel();
}
Throwable t = Exceptions.unwrap(error);
BiFunction<? super Throwable, Object, ? extends Throwable> hook =
context.getOrDefault(Hooks.KEY_ON_OPERATOR_ERROR, null);
if (hook == null) {
hook = Hooks.onOperatorErrorHook;
}
if (hook == null) {
if (dataSignal != null) {
if (dataSignal != t && dataSignal instanceof Throwable) {
t = Exceptions.addSuppressed(t, (Throwable) dataSignal);
}
//do not wrap original value to avoid strong references
/*else {
}*/
}
return t;
}
return hook.apply(error, dataSignal);
}
从onOperatorError的源码里能够看到:
- Exceptions.throwIfFatal决议是否直接抛出反常仍是进入onError流程。比方JVM的反常,例如OutOfMemory之类的,就会直接抛出。
- 接下来,便是咱们前面想要的答案,反常时数据流会完毕的原因,便是默认会调用subscription.cancel()。
- 最终,是对反常处理的钩子hook,能够经过传入的onNext的数据dataSingal以及反常error进行处理,例如打印日志或许进行反常的转化,甚至是吃掉反常。
实操 —— try-catch-finally的平替
在了解了reactor中对反常处理的机制后,咱们看看有哪些操作符能够用来代替以前指令式编程中的try-catch-finally的结构。遇到反常时,咱们通用的办法便是打日志、降级兜底、重试三板斧,下面咱们详细看看在reactor中是怎样完成的。
Flux.just(1,2,3)
.doFinally(signalType -> System.out.println("Finally: [" + signalType + "]" ))
.map(t -> {
if (t == 2) {
throw new IllegalArgumentException("Exception:"+t);
}
return t;
})
.doOnError(e -> System.out.println("log: error happened with [" + e.getMessage() + "]"))
.doFinally(signalType -> System.out.println("Finally: [" + signalType + "]" ))
.onErrorReturn(42)
// .onErrorResume(e -> Flux.just(11,12,13))
.subscribe(d -> System.out.println("data:" + d), e -> System.out.println("ERROR:" + e.getMessage()), () -> System.out.println("Completed!"));
这是一个运用示例,其输出如下,下面会对这些操作符进行介绍。
data:1
Finally: [cancel]
log: error happened with [Exception:2]
data:42
Completed!
Finally: [onError]
1. 降级兜底 – onErrorReturn/onErrorResume
当遇到反常的时分,能够运用onErrorReturn来处理,回来一个默认值,其底层完成其实用的仍是onErrorResume。比较onErrorReturn只能回来一个默认值而言,onErrorResume更灵活,它能够依据不同的error类型,还完成不同的回来值,其fallback函数的入参是反常类型,回来的则是一个Publisher。所以从回来值也能够看出来,onErrorReturn/onErrorResume回来的是一个新的流,旧的流现已在发生反常的时分就完毕了。
public final Flux<T> onErrorResume(Predicate<? super Throwable> predicate,
Function<? super Throwable, ? extends Publisher<? extends T>> fallback) {
Objects.requireNonNull(predicate, "predicate");
return onErrorResume(e -> predicate.test(e) ? fallback.apply(e) : error(e));
}
这是onErrorResume的完成,其供给了特别灵活的处理办法,predicate决议是否要进行fallback,fallback下能够依据反常类型回来恣意的数据流。假如不需求对反常fallback,即predicate为false时,则直接回来FluxError的封装,进入onError阶段。
再补充一点,既然fallback是回来的一个新的流,那么即能够fallback回来一个单值,例如onErrorReturn那样,也能够回来多个值的数据流,例如:
Flux.just(1,2,3)
.map(t -> {
if (t == 2) {
throw new IllegalArgumentException("Exception:"+t);
}
return t;
})
.onErrorResume(e -> Flux.just(11,12,13))
.subscribe(d -> System.out.println("data:" + d), e -> System.out.println("ERROR:" + e.getMessage()), () -> System.out.println("Completed!"));
输出成果会是:1 11 12 13
。
其实,关于Flux而言,因为是多值的数据流,其很难依据反常error就进行适宜的兜底,因为兜底往往取决于输入,而反常的时分往往丢失了数据data的信息了,所以关于Flux的降级,onErrorReturn/onErrorResume实用性不是太强,因为回来的新的流很难代替旧的流,甚至你都不知道旧的Flux流有多少数据量。
比较之下,onErrorReturn/onErrorResume用于单值的Mono就显得更为适宜了。因为用法相同,这里就不过多赘述了。
所以,关于Flux流的降级兜底是个很困难的事情,有一种办法能够让onErrorReturn/Resume获取到当时的数据data,那便是使用前面末节中提到的,添加Operators.onOperatorError中的onOperatorErrorHook,把数据data塞入反常中再回来给onErrorReturn/Resume。
最终再看下FluxOnErrorResume是怎样完成降级的。
public void onError(Throwable t) {
if (!second) {
second = true;
Publisher<? extends T> p;
try {
//nextFactory即fallback函数
p = Objects.requireNonNull(nextFactory.apply(t),
"The nextFactory returned a null Publisher");
}
catch (Throwable e) {
Throwable _e = Operators.onOperatorError(e, actual.currentContext());
_e = Exceptions.addSuppressed(_e, t);
actual.onError(_e);
return;
}
//从头订阅一个新的流,其source便是fallback函数发生的publisher
p.subscribe(this);
}
else {
actual.onError(t);
}
}
当上游出现反常时,例如从前示例中的FluxMap,就会进入onError阶段,此刻正好被onErrorResume的onError阶段阻拦,然后使用fallback函数发生新的流,再从头订阅p.subscribe(this)
。别的,也能够看出,新的流只会作用于onErrorResume之后的operator,前面的operator则不会有作用。
2. 打印日志 – doOnError
打印日志就比较简单了,能够用doOnError办法来完成。doOnError的底层则用的FluxPeek来完成,其作用是覆写了一切的接口,如onNext,onError, cancel等,经过覆写来完成hook。简直一切doOnXXX的办法都是依靠FluxPeek完成的,例如log、doOnNext、doOnError等等。因为与本次的主题无关,不再赘述,感兴趣的能够自行翻看FluxPeek的完成。
需求留意的是:虽然doOnXXX首要用于打印日志,但假如doOnXXX内部出错,也会导致整个流完毕,进入onError阶段。所以,也是有副作用的,仍然在主流程中。
3. finally – doFinally
try-catch-finally中的关键字finally能够经过办法doFinally来平替。需求留意的是doFinally办法的履行次序以及触发时机。
一般,finally的含义是确保100%被履行,也便是出错onError的时分履行,正常完毕onComplete也履行。但在reactor中,除了这两个事情外,还未能确保doFinally百分之百履行,还需求添加cancel的状况。其原因是,当出现反常后,关于反常的上游会走cancel流程,下游则走onError流程。如从前的示例,触发doFinally的信号分别是:cancel与onError。
最终再说一下履行次序,如从前的示例中那样,doFinally并不是按出现的次序履行,也不是一定是在最终履行的(这个差异与finally关键词不同很大)。其原因在于,当出现反常时,会先cancel掉原先的数据流,再调用onError处理(能够拜见前面FluxMap的源码)。
所以,示例中,”Finally: [cancel]”会先被打印,然后才是onErrorReturn的履行,即进入onError阶段。
那为什么第二个doFinally虽然出现在onErrorReturn之前,但又是最终履行的呢?
这是因为在完成doFinally的时分,先调用了下游的onError办法,再履行自身doFinally的办法,拜见FluxDoFinally的完成:
public void onError(Throwable t) {
try {
actual.onError(t);
}
finally {
runFinally(SignalType.ON_ERROR);
}
}
这样就能契合try-catch-finally的履行次序了。
所以,doFinally出现的方位很重要,若出现在反常前面,就会优先履行(不会像finally那样最终履行),若出现在反常后面,则会最终履行(相似finally)。
4. try-with-resource
关于try-with-resource,reactor也给了代替的完成,那便是using操作符:
public static <T, D> Flux<T> using(Callable<? extends D> resourceSupplier, Function<? super D, ? extends
Publisher<? extends T>> sourceSupplier, Consumer<? super D> resourceCleanup, boolean eager) {
return onAssembly(new FluxUsing<>(resourceSupplier,
sourceSupplier,
resourceCleanup,
eager));
}
其间resourceSupplier是创建生成资源的函数,sourceSupplier则是针对生成的resource进行操作并发生数据流,resourceCleanup则是在完毕后(不论成功仍是失利)进行资源的开释。
以try-with-resource为例:
try (SomeAutoCloseable disposableInstance = new SomeAutoCloseable()) {
return disposableInstance.toString();
}
使用using函数,则能够写成:
Flux.using(
() -> new SomeAutoCloseable(),
disposableInstance -> Flux.just(disposableInstance.toString()),
AutoCloseable::close
);
5. 重试 – retry / retryWhen
除了以上办法处理反常时,还有一种常见的办法便是重试。比方,咱们调用某个接口超时时,一般会重试一次,这个时分能够运用retry办法,如:
Flux.interval(Duration.ofMillis(250))
.map(input -> {
if (input < 3) return "tick " + input;
throw new RuntimeException("boom");
})
.retry(1)
.subscribe(System.out::println, System.err::println);
会对Flux流履行两次,其成果是:0 1 2 0 1 2
,即当遇到data为3时,会重试一次。
其基本思想很简单,便是阻拦onError流程,核算重试的次数,假如重试未超越,则从头订阅:
public void onError(Throwable t) {
long r = remaining;
if (r != Long.MAX_VALUE) {
if (r == 0) {
actual.onError(t);
return;
}
remaining = r - 1;
}
resubscribe();
}
这里的remaining
便是能够重试的次数,直到重试为0,再一次进入actual.onError。从头订阅的办法也很简单,便是把上游的source
与下游的actual
,再来一次subscribe:source.subscribe(actual)
。
除了retry
外,还有一个高档版别retryWhen
,它除了能像retry那样重试固定的次数外,还能支撑各种重试战略,因为retryWhen的源码相对杂乱,这里不再叙述(毕竟本文不是源码解读),但除了重试战略有差异外,其重试的机制仍是相同的,把上游与下游从头订阅。
6. 查看反常处理
在java中有一类反常是需求显现进行处理的,那便是查看反常(Checked Exception),如IOException。在指令式编程中,能够经过throws关键字来声明,从而能够把反常往外抛,而不需求立即处理。但是,遗憾的是,在reactor中,并没有相似的平替,不论任何状况,当遇到查看反常,reactor中都需求用try-catch来处理,这是唯一一个在reactor中没有找到指令式编程中的平替。
与指令式编程有throws关键字声明不同,reactor中处理查看反常都必须用try-catch来处理,处理的办法有以下三种:
- 捕获到反常并从中恢复。序列持续正常的进行。
- 捕获反常,将其封装成一个 不查看 的反常,然后将其抛出(中止序列)。
- 假如你需求回来一个 Flux(例如,在 flatMap 中),那么就用一个发生错误的 Flux 来封装反常,如下所示:return Flux.error(checkedException)。(这个序列也会停止。)
这三种办法中,其间最常见也最常用的办法便是第二种,将查看反常转化为非查看反常,如throw new RuntimeException(e)
。但是reactor供给了辅助东西类Exceptions,进而能够相对高雅简洁的进行统一处理。
如以下这个比如(来自reactor的文档):
public String convert(int i) throws IOException {
if (i > 3) {
throw new IOException("boom " + i);
}
return "OK " + i;
}
Flux<String> converted = Flux
.range(1, 10)
.map(i -> {
try { return convert(i); }
catch (IOException e) { throw Exceptions.propagate(e); }
});
converted.subscribe(
v -> System.out.println("RECEIVED: " + v),
e -> {
if (Exceptions.unwrap(e) instanceof IOException) {
System.out.println("Something bad happened with I/O");
} else {
System.out.println("Something bad happened");
}
}
);
因为convert声明晰查看反常IOException,所以必须要try-catch住,再使用Exceptions.propagate来封装为非查看反常。比较于直接用throw new RuntimeException(e)
,使用Exceptions的优点在onError处理阶段能够用Exceptions.unwrap()办法来获取内部真实抛出的反常,体现了使用东西类的优点——简洁明晰。
总结
本文先从reactor反常处理的底层机制讲起,讲清楚了一个基本概念:只需出现反常,不论怎样处理,旧的流都现已完毕,接下来处理的都是新的流。在这根底上,按指令式编程中的try-catch-finally的办法,用reactor的办法进行了一一代替介绍,期望经过对比的办法,能更好的把握在reactor中怎样高雅的处理反常。