流处理操作符

前面我们介绍过,ReactorCore中响应式流类似一种声明式的数据处理管线,创建了Publisher后为了拼接多个流程,这就涉及到流的转换过滤组合操作。ReactorCore框架提供了丰富的流操作符来构建这些逻辑。

流转换

map

map操作符能将数据流中的每个元素同步的转换为新值,下面例子将输出123乘以2的序列。

Flux.just(1, 2, 3)
        .map(i -> i * 2)
        .subscribe(System.out::println);

flatmap

flatmap也用于转换流中的元素,但和map不同的是它会将元素转换为Publisher,flatmap可以实现并发的订阅,这能够实现组合多个异步操作,实现更复杂的调度策略。

Flux.just(1, 2, 3)
        .subscribeOn(Schedulers.parallel())
        .flatMap(i -> Mono.just(i * 2))
        .subscribe(System.out::println);

flatmap由于可以是异步并发执行的,最终元素的输出顺序可能与源流的元素顺序不同,它适用于需要高并发处理,但不关心最终结果顺序的场景。

flatMapSequential

flatMapSequentialflatmap类似,flatMapSequential处理流中的每个元素时会并发地订阅生成的Publisher,但最终的合并结果会保持源流的顺序,适用于日志流式处理等场景。

流过滤

filter

filter操作符用于根据提供的条件过滤流中的元素。下面例子代码过滤去除了流中的1

Flux.just(1, 2, 3)
        .filter((x) -> x >= 2)
        .subscribe(System.out::println);

take

take操作符用于取流中的前N个元素,下面例子中,我们取了无限实现序列流中的5个元素。

Flux<Long> flux = Flux.interval(Duration.ofSeconds(1)).take(5);

takeLast

takeLast操作符用于取流中的后N个元素,这个操作会缓存流中的元素,仅适用于有限流。

Flux.just(1, 2, 3)
        .takeLast(2)
        .subscribe(System.out::println);

skip

skip操作符用于跳过流中前N个元素。

Flux.just(1, 2, 3)
        .skip(2)
        .subscribe(System.out::println);

distinct

distinct操作符可以实现对流中的内容进行去重,只有未出现过的元素才会传入后续流程。

Flux.just(1, 1, 2, 2, 3, 3)
        .distinct()
        .subscribe(System.out::println);

流组合

merge

merge操作符用于将多个数据流合并为一个数据流,merge可以并发执行,因此无法保证结果的顺序。

Flux<Integer> flux1 = Flux.just(1, 2, 3);
Flux<Integer> flux2 = Flux.just(4, 5, 6);
Mono<Integer> mono = Mono.just(7);
Flux.merge(flux1, flux2, mono).subscribe(System.out::println);

concat

concat也是合并数据流,但和merge不同,concat是串行执行的,它会等待当前Publisher完成后再订阅下一个Publisher

zip

zip操作符可以将多个Publisher的元素按顺序组合到一起,这描述起来有些抽象,我们直接看下面的例子。

Flux<Integer> flux1 = Flux.just(1, 2, 3);
Flux<Integer> flux2 = Flux.just(4, 5, 6);
Flux.zip(flux1, flux2).subscribe(System.out::println);

这段代码中,我们传入的序列是[1, 2, 3][4, 5, 6],它们zip操作后,会按照元素在序列中的索引位置组合,最终输出三个元素:[1, 4][2, 5][3, 6]

zip也支持传入一个Lambda表达式用于处理组合逻辑,下面我们将元素组合为字符串。

Flux<Integer> flux1 = Flux.just(1, 2, 3);
Flux<Integer> flux2 = Flux.just(4, 5, 6);
Flux
        .zip(flux1, flux2, (a, b) -> a + " & " + b)
        .subscribe(System.out::println);

另外一点我们要注意,如果传入的数据流长度不同,一旦某个数据流率先结束,那么zip操作也就结束了,下面例子会输出单个元素,数组[1, 4, 7]

Flux<Integer> flux1 = Flux.just(1, 2, 3);
Flux<Integer> flux2 = Flux.just(4, 5, 6);
Mono<Integer> mono = Mono.just(7);
Flux.zip(flux1, flux2, mono).subscribe(System.out::println);

reduce

reduce操作符比较特殊,它可以聚合数据流,即将Flux聚合成Mono。reduce不会是并发的,它是一个串行操作,会按照顺序逐次执行聚合逻辑。

Flux.just(1, 2, 3).reduce(0, Integer::sum).subscribe(System.out::println);

reduce操作符乍一看可能比较复杂,其方法签名如下。

public final <A> Mono<A> reduce(A initial, BiFunction<A, ? super T, A> accumulator);

其中,A是初始值也是最终返回值的类型,BiFunction中第1个参数是上一次聚合的结果(首次为初始值),第2个参数是集合中的每一个元素。

作者:Gacfox
版权声明:本网站为非盈利性质,文章如非特殊说明均为原创,版权遵循知识共享协议CC BY-NC-ND 4.0进行授权,转载必须署名,禁止用于商业目的或演绎修改后转载。
Copyright © 2017-2024 Gacfox All Rights Reserved.
Build with NextJS | Sitemap