Flux和Mono

上一篇笔记我们学习了Reactive Streams规范的基本概念以及如何自定义Publisher和Subscriber。实际上,在ReactorCore中,我们更倾向于使用框架提供的Publisher和Subscriber的抽象封装。

FluxMono就是ReactorCore中用于处理异步数据流的两个核心类型,它们都是一种Publisher,这篇笔记我们学习如何使用它们。

Flux和Mono基本概念

Flux是一个能够发出0-N个元素的Publisher,对应于Reactive Streams规范标准的Publisher,Flux会回调订阅者的onNext()方法传递数据,当回调onComplete()方法发出完成信号或回调onError()方法发出错误信号时,数据流终止。

Mono也是一种Publisher,但它最多发出1个元素,然后回调onComplete()方法发出完成信号或回调onError()方法发出错误信号。

虽然只生成1个元素的Flux也能表达Mono,但在ReactorCore中我们在这种情况下还是更倾向于使用Mono,Flux和Mono的区别在于表达的业务逻辑语义上,例如对于一个HTTP响应,如果返回的仅是单个数据实体元素,那么它显然适合使用Mono;而对于类似Server Send Events的响应,它返回的是流式的多个元素,那么显然更适合使用Flux。

Flux和Mono简单使用

创建Flux和Mono最简单的方法是使用这两个类提供的静态工具方法,例如just()可以根据一组已有数据创建有限的不可变的数据流,下面是一个例子。

package com.gacfox.demo;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class Main {
    public static void main(String[] args) {
        Flux<String> flux = Flux.just("Tom", "Jerry", "Spike");
        flux.subscribe(System.out::println);

        Mono<String> mono = Mono.just("Apple");
        mono.subscribe(System.out::println);
    }
}

Flux和Mono对象提供了subscribe()方法,ReactorCore中该方法对Subscriber进行了封装,subscribe()方法接收各种不同的Lambda形式来定义接收到数据的回调,上面例子代码我们仅处理了收到数据的回调,我们也可以额外处理完成和错误信号,下面是一个例子。

package com.gacfox.demo;

import reactor.core.publisher.Flux;

public class Main {
    public static void main(String[] args) {
        Flux<String> flux = Flux.just("Tom", "Jerry", "Spike");
        flux.subscribe(System.out::println,
                err -> System.out.println("Error"),
                () -> System.out.println("Completed"));
    }
}

根据Reactive Streams规范,错误信号和完成信号都是终止信号,它们中的某一个会在数据流结束后被回调。

提示:开发阶段,我们可以使用flux.log()输出响应式流的调试信息,便于我们观察其内部的执行流程。

可编程式的创建Flux和Mono

just()方法不支持动态内容,它会在调用时立即捕获传入的值,并将其存储为不可变的数据流,这种数据流用途有限。实际开发中,我们的数据流一般都是动态生成的,Flux和Mono提供了几种封装方法可供我们使用,这通常又涉及到ReactorCore中Sink的概念。Sink是ReactorCore中用于动态发布数据到Flux或Mono的对象,它允许开发者以更细粒度的方式控制数据流的生成和推送。Sink具体如何使用我们可以看后面的例子。

generate()方法

创建动态Flux和Mono最简单的方式是使用generate()方法,它用于同步式的逐个产生数据流元素,此时我们需要用到SynchronousSink,我们调用它的next()方法生成数据,调用error()complete()终止数据流。

generate()提供了众多的重载方法,不过我们最常用的其实就是下面这个。

public static <T, S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator)

这个重载的函数签名乍一看可能比较复杂,我们可以按如下理解。其中Callable<S> stateSupplier用于提供状态的初始值,我们可以直接用Lambda表达式返回状态值或可变对象作为状态,BiFunction<S, SynchronousSink<T>, S> generator是一个函数式接口BiFunction<T, U, R>,在这里T是状态,U是Sink,R是返回值。

下面代码例子演示了如何使用该generate()方法的重载形式。

package com.gacfox.demo;

import reactor.core.publisher.Flux;

public class Main {
    public static void main(String[] args) {
        Flux<String> flux = Flux.generate(() -> 0,
                (state, sink) -> {
                    sink.next("Message " + state);
                    if (state >= 5) sink.complete();
                    return state + 1;
                });
        flux.subscribe(System.out::println);
    }
}

代码中,我们提供了0作为状态初始值,每次产生数据时,按顺序生成Message 0Message 1等字符串,直到状态值大于或等于5时结束数据流,注意generate()要求数据逐个产生,因此每次只能调用一次next()方法。

此时我们便创建了一个最简单的动态生成的数据流。至于Mono,因为Mono只会发出最多一个元素,因此它没有generate()方法。

create()方法

create()方法是一种更高级也更底层的创建Flux和Mono的写法,也是我们实际开发中较为常用的方式。create()方法既可以是同步的也可以是异步的,毕竟Sink本身是线程安全的,我们并不需要特别关心create()方法生成数据流的代码是同步还是异步(不过对于该方法来说,虽然也能实现同步生成数据流,但这没有什么意义,对于同步生成数据流我们应该使用generate()等方法),此外它还支持在一次回调中发出多个元素。

我们还是以Flux为例,此时生成元素我们需要用到FluxSink。下面代码我们将MyEventListener的异步回调方法桥接到了Flux上,这也是create()方法的主要使用方式。

package com.gacfox.demo;

import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

public class Main {
    public static void main(String[] args) {
        Flux<String> flux = Flux.create(sink -> {
            MyEventListener myEventListener = new MyEventListener(sink::next, sink::complete);
            new MyEventProcessor(myEventListener);
        }, FluxSink.OverflowStrategy.DROP);
        flux.subscribe(System.out::println);
    }
}

FluxSink是线程安全的,它可以并发的调用next()complete()error()方法,线程同步会在FluxSink内部处理,开发者无需做额外同步操作。

对于MyEventListenerMyEventProcessor我们这里并没有给出明确的实现代码,我们知道它会在某些情况下产生异步的回调事件即可,我们主要关注如何将MyEventListener的异步回调桥接到Flux上,代码其实很简单,就是将sink.next()sink.complete()设置为了MyEventListener的回调方法实现。

此外,我们这里还使用了FluxSink.OverflowStrategy.DROP背压策略,它会在下游没有准备好接收新的元素的时候丢弃这个元素。create()方法支持声明如下背压策略:

  • IGNORE:完全忽略下游背压请求,这可能会在下游队列满时抛出IllegalStateException
  • ERROR:当下游没有准备好接收新的元素时抛出IllegalStateException
  • DROP:当下游没有准备好接收新的元素时丢弃这个元素
  • LATEST:让下游只得到上游最新的元素
  • BUFFER:默认,缓存所有下游没有来得及处理的元素,但这可能导致OutOfMemoryError

FluxSink还提供了几个方法用于清理资源,sink.onCancel()sink.onDispose()在有错误出现或被取消的时候执行清理代码,sink.onCancel()仅针对取消执行操作,它先于sink.onDispose()执行。

push()方法是create()方法的一个变体,push()方法仅适用于单线程数据源(例如GUI事件监听器),我们需要确保在同一时间只有单个线程调用Sink的方法。

此外对于Mono,它也支持create()方法,Mono中我们需要使用MonoSink,它不能生成多个元素(实际上会抛弃第一个元素之后的所有元素),除此之外和FluxSink没有区别。

其它创建方法

除了最基础的just()generate()create(),Mono和Flux还有很多实用的创建方法,这里我们简单介绍一些。

Flux.range()可以用于生成整数序列,简单的生成整数序列需求可以替代之前介绍的generate()

Flux<Integer> flux = Flux.range(0, 5);

Flux.fromIterable()支持从可迭代对象中创建数据流。

List<Integer> list = Arrays.asList(0, 1, 2, 3, 4);
Flux<Integer> flux = Flux.fromIterable(list);

Flux.fromStream()支持从Java8的Stream对象中创建数据流。

List<Integer> list = Arrays.asList(0, 1, 2, 3, 4);
Stream<Integer> stream = list.stream().map(x -> x * 2);
Flux<Integer> flux = Flux.fromStream(stream);

Flux.interval()方法可以按固定时间间隔发射数据,下面例子实现按1秒的间隔发射01234

Flux<Long> flux = Flux.interval(Duration.ofSeconds(1)).take(5);

Mono.fromSupplier()是惰性求值方法,它会在调用时再触发内部的计算逻辑。

Mono<String> mono = Mono.fromSupplier(() -> "Hello");

Mono.fromCallable()Mono.fromSupplier()类似,也是一种惰性求值方法,但它支持处理受检查异常。

Mono<String> mono = Mono.fromCallable(() -> "Hello")
        .onErrorResume(e -> Mono.just("fallback"));

Mono.fromFuture()可以从一个Java8的CompletableFuture异步任务中创建,下面是一个例子。

Mono<String> mono = Mono.fromFuture(CompletableFuture.supplyAsync(() -> {
    // ... 假设这里进行了某些耗时操作
    return "data";
}));

错误处理

在响应式数据流中,处理错误涉及对Subscriber的onError()方法的调用,ReactorCore对其进行了封装,我们可以用更简单的写法实现细致的错误处理。subscribe()方法实际上支持3个参数,分别对应于Subscriber的元素处理事件、异常事件和完成事件。

下面代码中,1 / 0会抛出异常,因此最后会打印Error!

Flux.generate(sink -> sink.next(1 / 0))
        .subscribe(System.out::println,
                (e) -> System.out.println("Error!"),
                () -> System.out.println("Complete!"));

如果我们想要在数据流中间对异常进行处理,我们可以使用onErrorResume()onErrorReturn()方法。

onErrorResume()方法可以让数据流发生错误时切换到另一个Publisher,下面是一个例子。

Flux.generate(sink -> sink.next(1 / 0))
        .onErrorResume(e -> Mono.just(0))
        .subscribe(System.out::println);

onErrorReturn()方法则可以在数据流发生错误时返回一个默认值。

Flux.generate(sink -> sink.next(1 / 0))
        .onErrorReturn(0)
        .subscribe(System.out::println);

doOnError()方法则可以在异常发生时执行副作用操作,例如打印一条日志信息。

Flux.generate(sink -> sink.next(1 / 0))
        .doOnError((e) -> System.out.println("Error: " + e.getMessage()))
        .onErrorReturn(0)
        .subscribe(System.out::println);

ReactorCore还实现了重试功能,下面例子中我们使用retry()实现当流发生错误时重试3次再执行后续流程的功能。

Flux.generate(sink -> sink.next(1 / 0))
        .retry(3)
        .subscribe(System.out::println);
作者:Gacfox
版权声明:本网站为非盈利性质,文章如非特殊说明均为原创,版权遵循知识共享协议CC BY-NC-ND 4.0进行授权,转载必须署名,禁止用于商业目的或演绎修改后转载。
Copyright © 2017-2024 Gacfox All Rights Reserved.
Build with NextJS | Sitemap