上一篇笔记我们学习了Reactive Streams规范的基本概念以及如何自定义Publisher和Subscriber。实际上,在ReactorCore中,我们更倾向于使用框架提供的Publisher和Subscriber的抽象封装。
Flux
和Mono
就是ReactorCore中用于处理异步数据流的两个核心类型,它们都是一种Publisher,这篇笔记我们学习如何使用它们。
Flux是一个能够发出0-N个元素的Publisher,对应于Reactive Streams规范标准的Publisher,Flux会回调订阅者的onNext()
方法传递数据,当回调onComplete()
方法发出完成信号或回调onError()
方法发出错误信号时,数据流终止。
Mono也是一种Publisher,但它最多发出1个元素,然后回调onComplete()
方法发出完成信号或回调onError()
方法发出错误信号。
虽然只生成1个元素的Flux也能表达Mono,但在ReactorCore中我们在这种情况下还是更倾向于使用Mono,Flux和Mono的区别在于表达的业务逻辑语义上,例如对于一个HTTP响应,如果返回的仅是单个数据实体元素,那么它显然适合使用Mono;而对于类似Server Send Events的响应,它返回的是流式的多个元素,那么显然更适合使用Flux。
创建Flux和Mono最简单的方法是使用这两个类提供的静态工具方法,例如just()
可以根据一组已有数据创建有限的不可变的数据流,下面是一个例子。
package com.gacfox.demo;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class Main {
public static void main(String[] args) {
Flux<String> flux = Flux.just("Tom", "Jerry", "Spike");
flux.subscribe(System.out::println);
Mono<String> mono = Mono.just("Apple");
mono.subscribe(System.out::println);
}
}
Flux和Mono对象提供了subscribe()
方法,ReactorCore中该方法对Subscriber进行了封装,subscribe()
方法接收各种不同的Lambda形式来定义接收到数据的回调,上面例子代码我们仅处理了收到数据的回调,我们也可以额外处理完成和错误信号,下面是一个例子。
package com.gacfox.demo;
import reactor.core.publisher.Flux;
public class Main {
public static void main(String[] args) {
Flux<String> flux = Flux.just("Tom", "Jerry", "Spike");
flux.subscribe(System.out::println,
err -> System.out.println("Error"),
() -> System.out.println("Completed"));
}
}
根据Reactive Streams规范,错误信号和完成信号都是终止信号,它们中的某一个会在数据流结束后被回调。
提示:开发阶段,我们可以使用flux.log()
输出响应式流的调试信息,便于我们观察其内部的执行流程。
just()
方法不支持动态内容,它会在调用时立即捕获传入的值,并将其存储为不可变的数据流,这种数据流用途有限。实际开发中,我们的数据流一般都是动态生成的,Flux和Mono提供了几种封装方法可供我们使用,这通常又涉及到ReactorCore中Sink的概念。Sink是ReactorCore中用于动态发布数据到Flux或Mono的对象,它允许开发者以更细粒度的方式控制数据流的生成和推送。Sink具体如何使用我们可以看后面的例子。
generate()
方法创建动态Flux和Mono最简单的方式是使用generate()
方法,它用于同步式的逐个产生数据流元素,此时我们需要用到SynchronousSink
,我们调用它的next()
方法生成数据,调用error()
或complete()
终止数据流。
generate()
提供了众多的重载方法,不过我们最常用的其实就是下面这个。
public static <T, S> Flux<T> generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator)
这个重载的函数签名乍一看可能比较复杂,我们可以按如下理解。其中Callable<S> stateSupplier
用于提供状态的初始值,我们可以直接用Lambda表达式返回状态值或可变对象作为状态,BiFunction<S, SynchronousSink<T>, S> generator
是一个函数式接口BiFunction<T, U, R>
,在这里T是状态,U是Sink,R是返回值。
下面代码例子演示了如何使用该generate()
方法的重载形式。
package com.gacfox.demo;
import reactor.core.publisher.Flux;
public class Main {
public static void main(String[] args) {
Flux<String> flux = Flux.generate(() -> 0,
(state, sink) -> {
sink.next("Message " + state);
if (state >= 5) sink.complete();
return state + 1;
});
flux.subscribe(System.out::println);
}
}
代码中,我们提供了0
作为状态初始值,每次产生数据时,按顺序生成Message 0
、Message 1
等字符串,直到状态值大于或等于5
时结束数据流,注意generate()
要求数据逐个产生,因此每次只能调用一次next()
方法。
此时我们便创建了一个最简单的动态生成的数据流。至于Mono,因为Mono只会发出最多一个元素,因此它没有generate()
方法。
create()
方法create()
方法是一种更高级也更底层的创建Flux和Mono的写法,也是我们实际开发中较为常用的方式。create()
方法既可以是同步的也可以是异步的,毕竟Sink本身是线程安全的,我们并不需要特别关心create()
方法生成数据流的代码是同步还是异步(不过对于该方法来说,虽然也能实现同步生成数据流,但这没有什么意义,对于同步生成数据流我们应该使用generate()
等方法),此外它还支持在一次回调中发出多个元素。
我们还是以Flux为例,此时生成元素我们需要用到FluxSink
。下面代码我们将MyEventListener
的异步回调方法桥接到了Flux上,这也是create()
方法的主要使用方式。
package com.gacfox.demo;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
public class Main {
public static void main(String[] args) {
Flux<String> flux = Flux.create(sink -> {
MyEventListener myEventListener = new MyEventListener(sink::next, sink::complete);
new MyEventProcessor(myEventListener);
}, FluxSink.OverflowStrategy.DROP);
flux.subscribe(System.out::println);
}
}
FluxSink
是线程安全的,它可以并发的调用next()
、complete()
和error()
方法,线程同步会在FluxSink
内部处理,开发者无需做额外同步操作。
对于MyEventListener
和MyEventProcessor
我们这里并没有给出明确的实现代码,我们知道它会在某些情况下产生异步的回调事件即可,我们主要关注如何将MyEventListener
的异步回调桥接到Flux上,代码其实很简单,就是将sink.next()
和sink.complete()
设置为了MyEventListener
的回调方法实现。
此外,我们这里还使用了FluxSink.OverflowStrategy.DROP
背压策略,它会在下游没有准备好接收新的元素的时候丢弃这个元素。create()
方法支持声明如下背压策略:
IGNORE
:完全忽略下游背压请求,这可能会在下游队列满时抛出IllegalStateException
ERROR
:当下游没有准备好接收新的元素时抛出IllegalStateException
DROP
:当下游没有准备好接收新的元素时丢弃这个元素LATEST
:让下游只得到上游最新的元素BUFFER
:默认,缓存所有下游没有来得及处理的元素,但这可能导致OutOfMemoryError
FluxSink
还提供了几个方法用于清理资源,sink.onCancel()
、sink.onDispose()
在有错误出现或被取消的时候执行清理代码,sink.onCancel()
仅针对取消执行操作,它先于sink.onDispose()
执行。
push()
方法是create()
方法的一个变体,push()
方法仅适用于单线程数据源(例如GUI事件监听器),我们需要确保在同一时间只有单个线程调用Sink的方法。
此外对于Mono,它也支持create()
方法,Mono中我们需要使用MonoSink
,它不能生成多个元素(实际上会抛弃第一个元素之后的所有元素),除此之外和FluxSink
没有区别。
除了最基础的just()
、generate()
和create()
,Mono和Flux还有很多实用的创建方法,这里我们简单介绍一些。
Flux.range()
可以用于生成整数序列,简单的生成整数序列需求可以替代之前介绍的generate()
。
Flux<Integer> flux = Flux.range(0, 5);
Flux.fromIterable()
支持从可迭代对象中创建数据流。
List<Integer> list = Arrays.asList(0, 1, 2, 3, 4);
Flux<Integer> flux = Flux.fromIterable(list);
Flux.fromStream()
支持从Java8的Stream对象中创建数据流。
List<Integer> list = Arrays.asList(0, 1, 2, 3, 4);
Stream<Integer> stream = list.stream().map(x -> x * 2);
Flux<Integer> flux = Flux.fromStream(stream);
Flux.interval()
方法可以按固定时间间隔发射数据,下面例子实现按1秒的间隔发射0
、1
、2
、3
、4
。
Flux<Long> flux = Flux.interval(Duration.ofSeconds(1)).take(5);
Mono.fromSupplier()
是惰性求值方法,它会在调用时再触发内部的计算逻辑。
Mono<String> mono = Mono.fromSupplier(() -> "Hello");
Mono.fromCallable()
与Mono.fromSupplier()
类似,也是一种惰性求值方法,但它支持处理受检查异常。
Mono<String> mono = Mono.fromCallable(() -> "Hello")
.onErrorResume(e -> Mono.just("fallback"));
Mono.fromFuture()
可以从一个Java8的CompletableFuture
异步任务中创建,下面是一个例子。
Mono<String> mono = Mono.fromFuture(CompletableFuture.supplyAsync(() -> {
// ... 假设这里进行了某些耗时操作
return "data";
}));
在响应式数据流中,处理错误涉及对Subscriber的onError()
方法的调用,ReactorCore对其进行了封装,我们可以用更简单的写法实现细致的错误处理。subscribe()
方法实际上支持3个参数,分别对应于Subscriber的元素处理事件、异常事件和完成事件。
下面代码中,1 / 0
会抛出异常,因此最后会打印Error!
。
Flux.generate(sink -> sink.next(1 / 0))
.subscribe(System.out::println,
(e) -> System.out.println("Error!"),
() -> System.out.println("Complete!"));
如果我们想要在数据流中间对异常进行处理,我们可以使用onErrorResume()
和onErrorReturn()
方法。
onErrorResume()
方法可以让数据流发生错误时切换到另一个Publisher,下面是一个例子。
Flux.generate(sink -> sink.next(1 / 0))
.onErrorResume(e -> Mono.just(0))
.subscribe(System.out::println);
onErrorReturn()
方法则可以在数据流发生错误时返回一个默认值。
Flux.generate(sink -> sink.next(1 / 0))
.onErrorReturn(0)
.subscribe(System.out::println);
doOnError()
方法则可以在异常发生时执行副作用操作,例如打印一条日志信息。
Flux.generate(sink -> sink.next(1 / 0))
.doOnError((e) -> System.out.println("Error: " + e.getMessage()))
.onErrorReturn(0)
.subscribe(System.out::println);
ReactorCore还实现了重试功能,下面例子中我们使用retry()
实现当流发生错误时重试3次再执行后续流程的功能。
Flux.generate(sink -> sink.next(1 / 0))
.retry(3)
.subscribe(System.out::println);