reactor是一个基于reactive streams的呼应式编程结构。 在了解project reactor 项目之前, 你需求熟悉观察者形式(订阅发布形式)和reactive streams。
只有了解了这些内容,才干更好的开端project reactor的学习。你能够经过看
观察者形式之我见 和
一篇入门reactive streams背压呼应式编程
了简略学习这两个常识点。
主张的学习办法
学习reactor的总过程和前置条件
- 首要了解同步也异步的概念,了解为什么需求异步
- 了解观察者形式,了解为什么需求观察者形式
- 了解reactive streams,至少知道观察者形式(订阅发布形式)在reactive streams中是怎么要求的。
- 要学习和运用jdk 8 中stream的操作办法和风格
- 开端学习reactor。
学习reactor的时分主张:
- 先了解reactor的根本流程再去学习
- 首要了解了flux和mono在订阅形式中的效果和位置,不要被flux和mono发生数据的办法和操作符的许多常识所迷惑。
- 了解了subscribe的办法和调用。
- 大约知道调度器scheduler的效果和运用,大约知道有一个hooks和效果即可。
- 大约翻阅flux和mono创建数据的办法,用的时分能够查阅。
- 大约翻阅flux和mono的操作符,用的时分能够查阅。
- 上手实在的项目,开端运用reactor,需求的时分查阅文档。项目中假如有必要再去学习context。
- 等reactor能上手搞定项目,再试着去经过scheduler和hooks来优化项目。
假如你是为了面试,当我没说。
数据源Flux 和 Mono
reactor正如一切的发布订阅形式相同,符合reactive streams规范。 所以reactor也包含有publisher, subscriber, subscription, processor, operator等概念。
Flux和Mono便是reactor完结的publisher,他们能够承受被其他的订阅器所订阅,发生数据并且把数据推送给订阅器。 同时他们还集成了一些对数据流的操作,比如map, filter等。
区别
Flux是一个包含0到N个元素的数据流,Mono是一个包含0或者1个元素的数据流。
根本流程
总体上了解了reactor的流程,才干不被小事的概念迷失了方向。其实整个reactor就一个订阅发布形式。
Flux和Mono是整个系统默许的publisher,目的是为了简化publisher自定义的作业。
Flux和Mono集成了许多的操作符,用来削减咱们自定义subscriber和processor的作业量。
由于操作符的存在,咱们对数据源和元素的操作就不需求自己定义自己的processor和subscriber了,直接运用操作符的组合即可完结作业.
除非不得已,否则不要试图自定义subscriber和processor。
graph LR
A[创建数据源] --> B[调用操作符对数据进行处理]
B --> C[subscibe订阅数据源]
创建Flux/Mono数据源
了解了发布订阅形式和publisher的效果,就了解了flux和mono。Flux和mono为了满意需求,有大量的发生数据的办法,
由于篇幅问题,我把这部分内容独自进行了收拾,详见reactor之数据源的发生
操作符
在根本流程中,现已提到了reactor为了削减自定义subscriber和processor的作业量,集成了许多的操作符。
首要应该大约了解操作符的效果和运用场景,大约知道有哪些品种的操作符即可。
用到的时分无妨翻阅官方文档,常用的不必记,由于经常会用到。不常用的更不必回忆,由于记了也用不到。
由于篇幅问题,我把这部分内容独自进行了收拾,详见reactor之操作符
subscribe
subscribe 操作符用来订阅流中的元素。
当流中的元素没有被订阅的时分,一切的操作都不会触发,只有当流中的元素被订阅的时分,一切的操作才会触发。
常用的subscribe接口如下
Flux.subscribe();
/**
* @param consumer 顾客接口,用来消费流中的元素
*
*/
Flux.subscribe(Consumer<? super T> consumer);
/**
* @param consumer 顾客接口,用来消费流中的元素
* @param errorConsumer 过错顾客接口,用来消费流中的过错
*/
Flux.subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer);
/**
* @param consumer 顾客接口,用来消费流中的元素
* @param errorConsumer 过错顾客接口,用来消费流中的过错
* @param completeConsumer 完结顾客接口,用来消费流中的完结
*/
Flux.subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer);
/**
* @param consumer 顾客接口,用来消费流中的元素
* @param errorConsumer 过错顾客接口,用来消费流中的过错
* @param completeConsumer 完结顾客接口,用来消费流中的完结
* @param subscriptionConsumer 订阅顾客接口,用来消费流中的订阅
*/
Flux.subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer, Consumer<? super Subscription> subscriptionConsumer)
Scheduler
Reactor也能够被认为是 并发无关(concurrency agnostic)的。意思便是, 它并不强制要求任何并发模型。
更进一步,它将选择权交给开发者。不过,它还是供给了一些便利 进行并发履行的库。
Reactor 供给了两种在呼应式链中调整调度器 Scheduler 的办法:publishOn 和 subscribeOn。
它们都承受一个 Scheduler 作为参数,然后能够改变调度器。
但是 publishOn 在链中呈现的位置 是有考究的,而 subscribeOn 则无所谓。
publishOn 它会 改变后续的操作符的履行地点线程 。而 subscribeOn 则会改变下游操作符的调度器。
在 Reactor 中,履行形式以及履行过程取决于所运用的 Scheduler。
- 当时线程(Schedulers.immediate())
- 单线程(Schedulers.single())
- 固定巨细线程池(Schedulers.parallel())
- 弹性线程池(Schedulers.elastic())
Flux.just(1, 2, 3)
.publishOn(Schedulers.parallel()) //指定在parallel线程池中履行
.map(i -> {
System.out.println("map1: " + Thread.currentThread().getName());
return i;
})
.publishOn(Schedulers.elastic()) // 指定下游的履行线程
.map(i -> {
System.out.println("map2: " + Thread.currentThread().getName());
return i;
})
.subscribeOn(Schedulers.single())
.subscribe(i -> System.out.println("subscribe: " + Thread.currentThread().getName()));
此外一些操作符会运用指定的调度器。
Flux.interval(Duration.ofSeconds(1), Schedulers.single())
.subscribe(System.out::println);
processor
Processor 是一个完结了 Publisher 和 Subscriber 接口的对象,它能够用来连接 Publisher 和 Subscriber。
大都情况下,你应该进行防止运用 Processor,它们较难正确运用,首要用于一些特别场景下。
比起直接运用 Reactor 的 Processors,更好的方式是经过调用一次 sink() 来得到 Processor 的 Sink。
FluxProcessor<String, String> processor = DirectProcessor.create();
processor.subscribe(System.out::println);
processor.onNext("foo");
processor.onNext("bar");
processor.onNext("baz");
processor.onComplete();
sink
Sink 是一个接口,它定义了一些办法,用来向 Processor 发送数据。
UnicastProcessor<Integer> processor = UnicastProcessor.create();
FluxSink<String> sink = processor.sink();
sink.next("foo");
sink.next("bar");
sink.next("baz");
sink.complete();
现有的 Processors 总览
- DirectProcessor:直接处理器,它是一个同步的处理器,它会将一切的数据发送给一切的订阅者。
- UnicastProcessor:单播处理器,它是一个同步的处理器,它只会将数据发送给第一个订阅者。
- ReplayProcessor:重放处理器,它是一个异步的处理器,它会将一切的数据发送给一切的订阅者,包含那些在订阅之后才订阅的订阅者。
- WorkQueueProcessor:作业队列处理器,它是一个异步的处理器,它会将一切的数据发送给一切的订阅者,包含那些在订阅之后才订阅的订阅者。
- TopicProcessor:主题处理器,它是一个异步的处理器,它会将一切的数据发送给一切的订阅者,包含那些在订阅之后才订阅的订阅者。
- EmitterProcessor:发射处理器,它是一个异步的处理器,它会将一切的数据发送给一切的订阅者,包含那些在订阅之后才订阅的订阅者。
Hooks
Hooks算是一个工具类,设定好今后,对后边的Flux和Mono都会回调Hooks设置的办法,相似操作系统的钩子。
本部分算是reactor中比较高档的部分,主张在开端上手用reactor做项现在,大约知道有这么一个概念即可。
做了一两个项目今后,再回头来看看hooks是做什么的即可
我把这部分的内容进行了拆分,详见:reactor之Hooks
Context
当从指令式编程风格切换到呼应式编程风格的时分,一个技术上最大的挑战便是线程处理。
在指令式编程风格中,咱们能够经过 ThreadLocal 来传递数据,
但是在呼应式编程风格中,咱们无法经过 ThreadLocal 来传递数据。
由于线程是由 Reactor 来管理的,咱们无法控制线程的创建和销毁。
Context 便是用来解决这个问题的。Context 是一个接口,它定义了一些办法,用来获取和设置数据。
这部分内容相对也比较难以了解,主张把学习和了解放在后边,总之你需求用到相似多线程环境中的ThreadLocal类的时分,再来学习这部分不迟。
String key = "key";
Mono<String> r = Mono.just("hello")
.flatMap(s -> Mono.subscriberContext()
.map(ctx -> s + " " + ctx.get(key)))
.subscriberContext(ctx -> ctx.put(key, "world"));
r.subscribe(System.out::println);
// 输出:hello world
context api
Context 是一个相似于 Map(这种数据结构)的接口:它存储键值(key-value)对,你需求经过 key 来获取值:
- put 办法:将一个键值对放入 Context 中。
- get 办法:经过 key 来获取值。
- delete 办法:经过 key 来删除键值对。
- hasKey 办法:经过 key 来判断是否存在键值对。
- stream 办法:返回一个流,用来遍历 Context 中的一切键值对。
- isEmpty 办法:判断 Context 是否为空。
- size 办法:返回 Context 中键值对的个数。
- putAll 办法:将一个 Context 中的一切键值对放入另一个 Context 中。
- currentContext 办法:返回当时线程的 Context。
- empty 办法:返回一个空的 Context。
- root 办法:返回一个空的 Context。
把context 绑定到Flux and writing
String key = "key";
Flux<String> r = Flux.just("hello")
.flatMap(s -> Mono.subscriberContext()
.subscriberContext(ctx -> ctx.put(key, "world"));
.map(ctx -> s + " " + ctx.get(key)))
从context中读取数据
String key = "key"
Flux<String> r = Flux.just("hello")
.flatMap(s -> Mono.subscriberContext()
.map(ctx -> s + " " + ctx.get(key)))
.subscriberContext(ctx -> ctx.put(key, "world"));
r.subscribe(System.out::println);
// 输出:hello world