Spring Project Reactor从入门到放弃

reactor是一个基于reactive streams的呼应式编程结构。 在了解project reactor 项目之前, 你需求熟悉观察者形式(订阅发布形式)和reactive streams。
只有了解了这些内容,才干更好的开端project reactor的学习。你能够经过看
观察者形式之我见 和
一篇入门reactive streams背压呼应式编程
了简略学习这两个常识点。

主张的学习办法

学习reactor的总过程和前置条件

  1. 首要了解同步也异步的概念,了解为什么需求异步
  2. 了解观察者形式,了解为什么需求观察者形式
  3. 了解reactive streams,至少知道观察者形式(订阅发布形式)在reactive streams中是怎么要求的。
  4. 要学习和运用jdk 8 中stream的操作办法和风格
  5. 开端学习reactor。

学习reactor的时分主张:

  1. 先了解reactor的根本流程再去学习
  2. 首要了解了flux和mono在订阅形式中的效果和位置,不要被flux和mono发生数据的办法和操作符的许多常识所迷惑。
  3. 了解了subscribe的办法和调用。
  4. 大约知道调度器scheduler的效果和运用,大约知道有一个hooks和效果即可。
  5. 大约翻阅flux和mono创建数据的办法,用的时分能够查阅。
  6. 大约翻阅flux和mono的操作符,用的时分能够查阅。
  7. 上手实在的项目,开端运用reactor,需求的时分查阅文档。项目中假如有必要再去学习context。
  8. 等reactor能上手搞定项目,再试着去经过scheduler和hooks来优化项目。

假如你是为了面试,当我没说。

数据源Flux 和 Mono

reactor正如一切的发布订阅形式相同,符合reactive streams规范。 所以reactor也包含有publisher, subscriber, subscription, processor, operator等概念。
Flux和Mono便是reactor完结的publisher,他们能够承受被其他的订阅器所订阅,发生数据并且把数据推送给订阅器。 同时他们还集成了一些对数据流的操作,比如map, filter等。

区别

Flux是一个包含0到N个元素的数据流,Mono是一个包含0或者1个元素的数据流。

根本流程

总体上了解了reactor的流程,才干不被小事的概念迷失了方向。其实整个reactor就一个订阅发布形式。
Flux和Mono是整个系统默许的publisher,目的是为了简化publisher自定义的作业。

Flux和Mono集成了许多的操作符,用来削减咱们自定义subscriber和processor的作业量。
由于操作符的存在,咱们对数据源和元素的操作就不需求自己定义自己的processor和subscriber了,直接运用操作符的组合即可完结作业.
除非不得已,否则不要试图自定义subscriber和processor。

graph LR
A[创建数据源] --> B[调用操作符对数据进行处理]
B --> C[subscibe订阅数据源]

创建Flux/Mono数据源

了解了发布订阅形式和publisher的效果,就了解了flux和mono。Flux和mono为了满意需求,有大量的发生数据的办法,
由于篇幅问题,我把这部分内容独自进行了收拾,详见reactor之数据源的发生

操作符

在根本流程中,现已提到了reactor为了削减自定义subscriber和processor的作业量,集成了许多的操作符。
首要应该大约了解操作符的效果和运用场景,大约知道有哪些品种的操作符即可。
用到的时分无妨翻阅官方文档,常用的不必记,由于经常会用到。不常用的更不必回忆,由于记了也用不到。
由于篇幅问题,我把这部分内容独自进行了收拾,详见reactor之操作符

subscribe

subscribe 操作符用来订阅流中的元素。
当流中的元素没有被订阅的时分,一切的操作都不会触发,只有当流中的元素被订阅的时分,一切的操作才会触发。
常用的subscribe接口如下


Flux.subscribe();
/**
 * @param consumer 顾客接口,用来消费流中的元素
 *                 
 */
Flux.subscribe(Consumer<? super T> consumer);
/**
 * @param consumer 顾客接口,用来消费流中的元素
 * @param errorConsumer 过错顾客接口,用来消费流中的过错
 */
Flux.subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer);
/**
 * @param consumer 顾客接口,用来消费流中的元素
 * @param errorConsumer 过错顾客接口,用来消费流中的过错
 * @param completeConsumer 完结顾客接口,用来消费流中的完结
 */
Flux.subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer);
/**
 * @param consumer 顾客接口,用来消费流中的元素
 * @param errorConsumer 过错顾客接口,用来消费流中的过错
 * @param completeConsumer 完结顾客接口,用来消费流中的完结
 * @param subscriptionConsumer 订阅顾客接口,用来消费流中的订阅
 */              
Flux.subscribe(Consumer<? super T> consumer, Consumer<? super Throwable> errorConsumer, Runnable completeConsumer, Consumer<? super Subscription> subscriptionConsumer)

Scheduler

Reactor也能够被认为是 并发无关(concurrency agnostic)的。意思便是, 它并不强制要求任何并发模型。
更进一步,它将选择权交给开发者。不过,它还是供给了一些便利 进行并发履行的库。
Reactor 供给了两种在呼应式链中调整调度器 Scheduler 的办法:publishOn 和 subscribeOn。
它们都承受一个 Scheduler 作为参数,然后能够改变调度器。
但是 publishOn 在链中呈现的位置 是有考究的,而 subscribeOn 则无所谓。
publishOn 它会 改变后续的操作符的履行地点线程 。而 subscribeOn 则会改变下游操作符的调度器。

在 Reactor 中,履行形式以及履行过程取决于所运用的 Scheduler。

  • 当时线程(Schedulers.immediate())
  • 单线程(Schedulers.single())
  • 固定巨细线程池(Schedulers.parallel())
  • 弹性线程池(Schedulers.elastic())
Flux.just(1, 2, 3)
        .publishOn(Schedulers.parallel()) //指定在parallel线程池中履行
        .map(i -> {
            System.out.println("map1: " + Thread.currentThread().getName());
            return i;
        })
        .publishOn(Schedulers.elastic()) // 指定下游的履行线程
        .map(i -> {
            System.out.println("map2: " + Thread.currentThread().getName());
            return i;
        })
        .subscribeOn(Schedulers.single())
        .subscribe(i -> System.out.println("subscribe: " + Thread.currentThread().getName()));

此外一些操作符会运用指定的调度器。

Flux.interval(Duration.ofSeconds(1), Schedulers.single())
        .subscribe(System.out::println);

processor

Processor 是一个完结了 Publisher 和 Subscriber 接口的对象,它能够用来连接 Publisher 和 Subscriber。
大都情况下,你应该进行防止运用 Processor,它们较难正确运用,首要用于一些特别场景下。
比起直接运用 Reactor 的 Processors,更好的方式是经过调用一次 sink() 来得到 Processor 的 Sink。

FluxProcessor<String, String> processor = DirectProcessor.create();
processor.subscribe(System.out::println);
processor.onNext("foo");
processor.onNext("bar");
processor.onNext("baz");
processor.onComplete();

sink

Sink 是一个接口,它定义了一些办法,用来向 Processor 发送数据。

UnicastProcessor<Integer> processor = UnicastProcessor.create();
FluxSink<String> sink = processor.sink();
sink.next("foo");
sink.next("bar");
sink.next("baz");
sink.complete();

现有的 Processors 总览

  • DirectProcessor:直接处理器,它是一个同步的处理器,它会将一切的数据发送给一切的订阅者。
  • UnicastProcessor:单播处理器,它是一个同步的处理器,它只会将数据发送给第一个订阅者。
  • ReplayProcessor:重放处理器,它是一个异步的处理器,它会将一切的数据发送给一切的订阅者,包含那些在订阅之后才订阅的订阅者。
  • WorkQueueProcessor:作业队列处理器,它是一个异步的处理器,它会将一切的数据发送给一切的订阅者,包含那些在订阅之后才订阅的订阅者。
  • TopicProcessor:主题处理器,它是一个异步的处理器,它会将一切的数据发送给一切的订阅者,包含那些在订阅之后才订阅的订阅者。
  • EmitterProcessor:发射处理器,它是一个异步的处理器,它会将一切的数据发送给一切的订阅者,包含那些在订阅之后才订阅的订阅者。

Hooks

Hooks算是一个工具类,设定好今后,对后边的Flux和Mono都会回调Hooks设置的办法,相似操作系统的钩子。

本部分算是reactor中比较高档的部分,主张在开端上手用reactor做项现在,大约知道有这么一个概念即可。
做了一两个项目今后,再回头来看看hooks是做什么的即可

我把这部分的内容进行了拆分,详见:reactor之Hooks

Context

当从指令式编程风格切换到呼应式编程风格的时分,一个技术上最大的挑战便是线程处理。
在指令式编程风格中,咱们能够经过 ThreadLocal 来传递数据,
但是在呼应式编程风格中,咱们无法经过 ThreadLocal 来传递数据。
由于线程是由 Reactor 来管理的,咱们无法控制线程的创建和销毁。
Context 便是用来解决这个问题的。Context 是一个接口,它定义了一些办法,用来获取和设置数据。

这部分内容相对也比较难以了解,主张把学习和了解放在后边,总之你需求用到相似多线程环境中的ThreadLocal类的时分,再来学习这部分不迟。

String key = "key";
Mono<String> r = Mono.just("hello")
        .flatMap(s -> Mono.subscriberContext()
                .map(ctx -> s + " " + ctx.get(key)))
        .subscriberContext(ctx -> ctx.put(key, "world"));
r.subscribe(System.out::println);
// 输出:hello world

context api

Context 是一个相似于 Map(这种数据结构)的接口:它存储键值(key-value)对,你需求经过 key 来获取值:

  • put 办法:将一个键值对放入 Context 中。
  • get 办法:经过 key 来获取值。
  • delete 办法:经过 key 来删除键值对。
  • hasKey 办法:经过 key 来判断是否存在键值对。
  • stream 办法:返回一个流,用来遍历 Context 中的一切键值对。
  • isEmpty 办法:判断 Context 是否为空。
  • size 办法:返回 Context 中键值对的个数。
  • putAll 办法:将一个 Context 中的一切键值对放入另一个 Context 中。
  • currentContext 办法:返回当时线程的 Context。
  • empty 办法:返回一个空的 Context。
  • root 办法:返回一个空的 Context。

把context 绑定到Flux and writing

String key = "key";
Flux<String> r = Flux.just("hello")
        .flatMap(s -> Mono.subscriberContext()
        .subscriberContext(ctx -> ctx.put(key, "world"));
                .map(ctx -> s + " " + ctx.get(key)))

从context中读取数据

String key = "key"
Flux<String> r = Flux.just("hello")
        .flatMap(s -> Mono.subscriberContext()
                .map(ctx -> s + " " + ctx.get(key)))
        .subscriberContext(ctx -> ctx.put(key, "world"));
r.subscribe(System.out::println);
// 输出:hello world