前面我们介绍过,ReactorCore中响应式流类似一种声明式的数据处理管线,创建了Publisher后为了拼接多个流程,这就涉及到流的转换、过滤与组合操作。ReactorCore框架提供了丰富的流操作符来构建这些逻辑。
map
操作符能将数据流中的每个元素同步的转换为新值,下面例子将输出1
、2
、3
乘以2
的序列。
Flux.just(1, 2, 3)
.map(i -> i * 2)
.subscribe(System.out::println);
flatmap
也用于转换流中的元素,但和map
不同的是它会将元素转换为Publisher,flatmap
可以实现并发的订阅,这能够实现组合多个异步操作,实现更复杂的调度策略。
Flux.just(1, 2, 3)
.subscribeOn(Schedulers.parallel())
.flatMap(i -> Mono.just(i * 2))
.subscribe(System.out::println);
flatmap
由于可以是异步并发执行的,最终元素的输出顺序可能与源流的元素顺序不同,它适用于需要高并发处理,但不关心最终结果顺序的场景。
flatMapSequential
和flatmap
类似,flatMapSequential
处理流中的每个元素时会并发地订阅生成的Publisher,但最终的合并结果会保持源流的顺序,适用于日志流式处理等场景。
filter
操作符用于根据提供的条件过滤流中的元素。下面例子代码过滤去除了流中的1
。
Flux.just(1, 2, 3)
.filter((x) -> x >= 2)
.subscribe(System.out::println);
take
操作符用于取流中的前N个元素,下面例子中,我们取了无限实现序列流中的5个元素。
Flux<Long> flux = Flux.interval(Duration.ofSeconds(1)).take(5);
takeLast
操作符用于取流中的后N个元素,这个操作会缓存流中的元素,仅适用于有限流。
Flux.just(1, 2, 3)
.takeLast(2)
.subscribe(System.out::println);
skip
操作符用于跳过流中前N个元素。
Flux.just(1, 2, 3)
.skip(2)
.subscribe(System.out::println);
distinct
操作符可以实现对流中的内容进行去重,只有未出现过的元素才会传入后续流程。
Flux.just(1, 1, 2, 2, 3, 3)
.distinct()
.subscribe(System.out::println);
merge
操作符用于将多个数据流合并为一个数据流,merge
可以并发执行,因此无法保证结果的顺序。
Flux<Integer> flux1 = Flux.just(1, 2, 3);
Flux<Integer> flux2 = Flux.just(4, 5, 6);
Mono<Integer> mono = Mono.just(7);
Flux.merge(flux1, flux2, mono).subscribe(System.out::println);
concat
也是合并数据流,但和merge
不同,concat
是串行执行的,它会等待当前Publisher
完成后再订阅下一个Publisher
。
zip
操作符可以将多个Publisher的元素按顺序组合到一起,这描述起来有些抽象,我们直接看下面的例子。
Flux<Integer> flux1 = Flux.just(1, 2, 3);
Flux<Integer> flux2 = Flux.just(4, 5, 6);
Flux.zip(flux1, flux2).subscribe(System.out::println);
这段代码中,我们传入的序列是[1, 2, 3]
和[4, 5, 6]
,它们zip
操作后,会按照元素在序列中的索引位置组合,最终输出三个元素:[1, 4]
,[2, 5]
,[3, 6]
。
zip
也支持传入一个Lambda表达式用于处理组合逻辑,下面我们将元素组合为字符串。
Flux<Integer> flux1 = Flux.just(1, 2, 3);
Flux<Integer> flux2 = Flux.just(4, 5, 6);
Flux
.zip(flux1, flux2, (a, b) -> a + " & " + b)
.subscribe(System.out::println);
另外一点我们要注意,如果传入的数据流长度不同,一旦某个数据流率先结束,那么zip
操作也就结束了,下面例子会输出单个元素,数组[1, 4, 7]
。
Flux<Integer> flux1 = Flux.just(1, 2, 3);
Flux<Integer> flux2 = Flux.just(4, 5, 6);
Mono<Integer> mono = Mono.just(7);
Flux.zip(flux1, flux2, mono).subscribe(System.out::println);
reduce
操作符比较特殊,它可以聚合数据流,即将Flux聚合成Mono。reduce
不会是并发的,它是一个串行操作,会按照顺序逐次执行聚合逻辑。
Flux.just(1, 2, 3).reduce(0, Integer::sum).subscribe(System.out::println);
reduce
操作符乍一看可能比较复杂,其方法签名如下。
public final <A> Mono<A> reduce(A initial, BiFunction<A, ? super T, A> accumulator);
其中,A
是初始值也是最终返回值的类型,BiFunction
中第1个参数是上一次聚合的结果(首次为初始值),第2个参数是集合中的每一个元素。