Reactor is a fully non-blocking reactive programming foundation for the JVM, with efficient demand management (in the form of managing “backpressure”).

听上去很难,但是它也遵循帕累托法则。 也就是说只要把握它20%的功用,就能完结大部分的日常工作。

以我现在的项目为例,到目前为止咱们还没用过backpressure 。咱们常常运用的功用是

  • (Flux|Mono).just
  • (Flux|Mono).map
  • (Flux|Mono).flatMap
  • (Flux|Mono).onErrorResume
  • (Flux|Mono).filter

这都是一些最简略的功用,但也是最常用的功用。所以不必忧虑,读过这篇博客后我相信你也能够在自己的项目里轻松地运用它。

怎么装置?

添加下面的配置到你的 pom.xml。咱们在这里运用 BOM,这样就不必忧虑版本兼容的问题了。

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-bom</artifactId>
            <version>2020.0.16</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>
<dependencies>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-core</artifactId>
    </dependency>
    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

怎么测验?

Reactor提供了一个测验库,咱们能够运用库中的 StepVerifier 来做下面的事情

  • 验证元素及其次序

    StepVerifier.create(Flux.fromArray(new Integer[] {1, 2, 3, 4, 5})).expectNext(1)
        .expectNext(2).expectNext(3).expectNext(4).expectNext(5).verifyComplete();
    
  • 验证错误

    StepVerifier.create(Flux.error(new Exception("some error")))
            .verifyErrorMessage("some error");
    
  • 验证序列是否完毕

    StepVerifier.create(Flux.empty()).verifyComplete();
    

鄙人面的章节里,咱们会运用 StepVerifier 来验证代码。

有哪些基本概念?

Reactor有两个最基本的概念

  • Flux

    表明一列数据,数据个数可能是0个,也可能是恣意多个。

  • Mono

    表明一列数据,数据个数要么是0个,要么是1个。

最常用的那20%功用是什么?

在这一节,我会运用几个概念来对功用进行分组

  • 装箱

    将数据放入 Flux|Mono

  • 转化

    将当前数据转化为别的的数据

  • 检视

    查看元素的值并履行一些操纵,但不会改动元素的值

  • 拆箱

    从 Flux|Mono中取出数据

装箱

怎么处理String?

  • Flux

    @Test
    public void canBeCreatedFromString() {
        StepVerifier.create(Flux.just("hello world")).expectNext("hello world").verifyComplete();
    }
    
  • Mono

    @Test
    public void canBeCreatedFromString() {
        StepVerifier.create(Mono.just("hello world")).expectNext("hello world").verifyComplete();
    }
    

怎么处理Number?

  • Flux

    @Test
    public void canBeCreatedFromNumber() {
        StepVerifier.create(Flux.just(1)).expectNext(1).verifyComplete();
        StepVerifier.create(Flux.just(1.0)).expectNext(1.0).verifyComplete();
    }
    
  • Mono

    @Test
    public void canBeCreatedFromNumber() {
        StepVerifier.create(Mono.just(1)).expectNext(1).verifyComplete();
        StepVerifier.create(Mono.just(1.0)).expectNext(1.0).verifyComplete();
    }
    

怎么处理 Optional 或许 Nullable 的数据?

只要Mono能够履行这个操作

@Test
public void canBeCreatedFromNullableValue() {
    String value = null;
    StepVerifier.create(Mono.justOrEmpty(value)).verifyComplete();
    value = "hello world";
    StepVerifier.create(Mono.justOrEmpty(value)).expectNext("hello world").verifyComplete();
}
@Test
public void canBeCreatedFromOptionalValue() {
    Optional<String> value = Optional.empty();
    StepVerifier.create(Mono.justOrEmpty(value)).verifyComplete();
    value = Optional.of("hello world");
    StepVerifier.create(Mono.justOrEmpty(value)).expectNext("hello world").verifyComplete();
}

怎么处理一个数据生成器?

这里数据生成器是指没有参数的函数,它的函数签名是() -> A。

只要Mono能够履行这个操作。

@Test
public void canBeCreatedFromCallable() {
    StepVerifier.create(Mono.fromCallable(() -> "hello world!")).expectNext("hello world!")
            .verifyComplete();
}

怎么处理Array?

只要Flux能够履行这个操作。

@Test
public void canBeCreatedFromArray() {
    StepVerifier.create(Flux.fromArray(new Integer[] {1, 2, 3, 4, 5})).expectNext(1)
            .expectNext(2).expectNext(3).expectNext(4).expectNext(5).verifyComplete();
}

怎么处理Iterable?

只要Flux能够履行这个操作。

@Test
public void canBeCreatedFromList() {
    List<Integer> list = new LinkedList<Integer>();
    list.add(1);
    list.add(2);
    list.add(3);
    list.add(4);
    StepVerifier.create(Flux.fromIterable(list)).expectNext(1).expectNext(2).expectNext(3)
            .expectNext(4).verifyComplete();
}
@Test
public void canBeCreatedFromSet() {
    Set<Integer> set = new HashSet<Integer>();
    set.add(1);
    set.add(2);
    set.add(2);
    set.add(3);
    set.add(4);
    StepVerifier.create(Flux.fromIterable(set)).expectNext(1).expectNext(2).expectNext(3)
            .expectNext(4).verifyComplete();
}

怎么处理Stream?

只要Flux能够履行这个操作。

@Test
public void canBeCreatedFromStream() {
    Stream<Integer> stream = Stream.of(1, 2, 3, 4, 5);
    StepVerifier.create(Flux.fromStream(stream)).expectNext(1).expectNext(2).expectNext(3)
            .expectNext(4).expectNext(5).verifyComplete();
}

怎么处理Throwable?

  • Flux

    @Test
    public void canBeCreatedFromThrowable() {
        StepVerifier.create(Flux.error(new Exception("some error")))
                .verifyErrorMessage("some error");
    }
    
  • Mono

    @Test
    public void canBeCreatedFromThrowable() {
        StepVerifier.create(Mono.error(new Exception("some error")))
                .verifyErrorMessage("some error");
    }
    

转化

怎么对元素进行过滤?

  • Flux

    @Test
    public void canDoFilter() {
        Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5, 6);
        StepVerifier.create(flux.filter(x -> x > 5)).expectNext(6).verifyComplete();
    }
    
  • Mono

    @Test
    public void canDoFilter() {
        StepVerifier.create(Mono.just(6).filter(x -> x > 5)).expectNext(6).verifyComplete();
        StepVerifier.create(Mono.just(2).filter(x -> x > 5)).verifyComplete();
    }
    

怎么在每个元素上履行函数A -> B?

假设有一个函数A -> B,其间A是Flux|Mono的元素类型, B又不是 Flux|Mono,那么咱们能够运用 (Flux|Mono).map 来在每个元素上履行这个函数。

  • Flux

    @Test
    public void canMapTheTypeOfValueInSequenceToAnotherType() {
        StepVerifier.create(flux.map(x -> x.toString())).expectNext("1").expectNext("2")
                .expectNext("3").expectNext("4").expectNext("5").expectNext("6").verifyComplete();
    }
    @Test
    public void canMapTheValueInSequenceToAnotherValue() {
        StepVerifier.create(flux.map(x -> x + 1)).expectNext(2).expectNext(3).expectNext(4)
                .expectNext(5).expectNext(6).expectNext(7).verifyComplete();
    }
    
  • Mono

    @Test
    public void canMapTheTypeOfValueInSequenceToAnotherType() {
        StepVerifier.create(Mono.just(1).map(x -> x.toString())).expectNext("1").verifyComplete();
    }
    @Test
    public void canMapTheValueInSequenceToAnotherValue() {
        StepVerifier.create(Mono.just(1).map(x -> x + 1)).expectNext(2).verifyComplete();
    }
    

怎么在每个元素上履行函数A -> (Flux|Mono)<B>?

假设有一个函数A -> (Flux|Mono)<B>,其间A是Flux|Mono的元素类型,那么咱们能够运用 (Flux|Mono).flatMap 或许 Mono.flatMapMany 来在每个元素上履行这个函数。

  • Flux

    @Test
    public void canMapTheValueInSequenceToAnotherSequence() {
        StepVerifier.create(flux.flatMap(x -> Flux.just(x, x))).expectNext(1).expectNext(1)
                .expectNext(2).expectNext(2).expectNext(3).expectNext(3).expectNext(4).expectNext(4)
                .expectNext(5).expectNext(5).expectNext(6).expectNext(6).verifyComplete();
        StepVerifier.create(flux.flatMap(x -> Mono.just(x))).expectNext(1).expectNext(2)
                .expectNext(3).expectNext(4).expectNext(5).expectNext(6).verifyComplete();
    }
    
  • Mono

    @Test
    public void canMapTheValueInSequenceToAnotherSequence() {
        StepVerifier.create(Mono.just(1).flatMapMany(x -> Flux.just(x, x))).expectNext(1)
                .expectNext(1).verifyComplete();
        StepVerifier.create(Mono.just(1).flatMap(x -> Mono.just(x + 1))).expectNext(2)
                .verifyComplete();
    }
    

怎么在没有数据的时候给一个默认值?

  • Flux

    @Test
    public void canRecoverWithSingleDefaultValueFromEmptySequence() {
        StepVerifier.<Integer>create(Flux.<Integer>empty().defaultIfEmpty(1)).expectNext(1)
                .verifyComplete();
    }
    
  • Mono

    @Test
    public void canRecoverWithSingleDefaultValueFromEmptySequence() {
        StepVerifier.<Integer>create(Mono.<Integer>empty().defaultIfEmpty(1)).expectNext(1)
                .verifyComplete();
    }
    

怎么在没有数据的时候将现在Flux|Mono替换为另一个Flux|Mono?

  • Flux

    @Test
    public void canRecoverWithAnotherSequenceFromEmptySequence() {
        StepVerifier.<Integer>create(Flux.<Integer>empty().switchIfEmpty(Flux.just(1)))
                .expectNext(1).verifyComplete();
    }
    
  • Mono

    @Test
    public void canRecoverWithAnotherSequenceFromEmptySequence() {
        StepVerifier.<Integer>create(Mono.<Integer>empty().switchIfEmpty(Mono.just(1)))
                .expectNext(1).verifyComplete();
    }
    

检视

怎么打印每一个元素?

咱们能够运用(Flux|Mono).doOnNext 来查看每个元素的值并做一些操作,但不会改动元素的值。

  • Flux

    @Test
    public void canDoSomethingForEveryElement() {
        Flux<Integer> flux = Flux.just(1, 2, 3, 4, 5, 6);
        flux.doOnNext(x -> System.out.println(x)).collectList().block();
    }
    
  • Mono

    @Test
    public void canDoSomethingForEveryElement() {
        Mono.just(1).doOnNext(x -> System.out.println(x)).block();
    }
    

拆箱

怎么将Flux转化为一个List?

@Test
public void canBeConvertedToList() {
    List<Integer> list = Flux.just(1, 2, 3).collectList().block();
    assertThat(list.size()).isEqualTo(3);
    assertThat(list.get(0)).isEqualTo(1);
    assertThat(list.get(1)).isEqualTo(2);
    assertThat(list.get(2)).isEqualTo(3);
}

怎么从Mono中取出数据?

@Test
public void canBeConvertedToValue() {
    assertThat(Mono.just(1).block()).isEqualTo(1);
    assertThat(Mono.empty().block()).isNull();
}

总结

我创建了一个repo reactor-examples,里面包含了上面所有的比如。

下载这个repo自己尝试一下吧!

git clone git@github.com:sjmyuan/reactor-examples.git

欢迎提交PR添加更多的比如。