Reactor反应式编程

Reactor是Pivotal公司开源的一款ReactiveStream反应式流编程框架,它可以理解为Java8的StreamAPI加上Java9的FlowAPI的另一个实现,RxJava也是一个同类的框架,RxJava主要用于Android等客户端编程,服务端的一些中间件如Hystrix等也是使用RxJava实现的。不过Spring中所有的异步编程模型都是基于Reactor来实现,Reactor中的核心依赖ReactorCore就是响应式异步编程的具体实现,这篇笔记我们简单介绍ReactorCore中相关的概念和API的使用。

本篇笔记不会再重复介绍Reactive编程相关的基础了,此外在学习本篇笔记之前你还应该掌握Java8 StreamAPI、Java8 CompletableFuture、Java9 FlowAPI,以及Java并发编程相关内容,这些内容都可以参考笔记的对应章节。

ReactorCore相关概念

  • 发布者:发布者实现了org.reactivestreams.Publisher接口,用于发布元素。SpringWebFlux中常用的有Mono和Flux两种。
  • Mono:代表0-1个元素的发布者。
  • Flux:代表0-N个元素的发布者,SpringWebFlux中还可以用于生成HTTP协议中的SSE事件流响应。
  • 订阅者(消费者):订阅者从发布者中读取并处理元素,SpringWebFlux框架本身就是消费者。
  • 背压:一种订阅者限制发布者发布元素数量的机制,用于避免资源耗尽的情况出现。

在使用SpringWebFlux进行服务端程序开发时,其实我们关注发布者就行了,因为订阅者就是框架本身。我们几乎不能在自己的代码中消费元素,因为这会阻塞线程,破坏了响应式的代码逻辑,所有对具体元素的操作都应该使用流操作符来实现。发布者对象的来源可以是一个现成的对象或数组,也可能是来自WebClient或SpringDataReactive的返回值,我们可以使用流操作符将这些内容组合到一起,生成最终的返回值给SpringWebFlux消费。

ReactorCore的使用

下面我们介绍一些ReactorCore在日常开发中的常见用法。

创建发布者

直接从变量创建

最常用的发布者就是Mono和Flux,我们可以直接从对象变量中创建Mono和Flux,只需要调用just静态方法就行了。

Mono<String> mono = Mono.just("data1");
Flux<String> flux = Flux.just("data1", "data2", "data3");

此外Flux也支持一些其它的创建方式重载,比如从数组创建,或是从Stream(JDK8)创建,这里就不逐一列举了。

Flux<String> flux = Flux.fromArray(new String[]{"data1", "data2", "data3"});

从异步任务创建

对于Mono和Flux,实际上更常见的情况是从异步任务来创建。Java中异步回调比较传统的方式是基于接口实现的。

Mono<String> mono = Mono.create((sink) -> {
    DemoAsyncTask demoAsyncTask = new DemoAsyncTask();
    demoAsyncTask.startTask(new Callback() {
        @Override
        public void onSuccess(String result) {
            sink.success(result);
        }

        @Override
        public void onFailure(Throwable ex) {
            sink.error(ex);
        }
    });
});

上面例子中我们启动了一个异步任务,并通过MonoSink对象关联和任务完成(成功或失败)时和发布者Mono之间的关系。

CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    return "data0";
});

Mono<String> mono = Mono.fromFuture(completableFuture);

JDK8增加了CompletableFuture的新特性,异步任务采用这种方式实现其可读性比传统的基于接口实现的异步回调要好一些,Mono也支持fromFuture()方法直接从CompletableFuture创建,上面代码演示了从CompletableFuture创建Mono的写法。

在实际开发中,其实我们的Mono和Flux更有可能来自WebClient发起的HTTP微服务调用,或是SpringDataReactive查询数据库后的返回值,这些都是异步操作,Spring官方的响应式框架都会封装好Mono和Flux作为返回值给我们使用,但一些未集成ReactorCore的框架则可能采用上面两种方式返回异步结果。

消费元素

SpringWebFlux中,框架本身就是消费者,我们的代码中理论上不应该出现任何消费操作。

流操作

ReactiveStream中的流操作虽然看上去很抽象,但用熟了其实就那么几种,如果使用过其它的响应式框架,那么就一定听说过filtermapflatmapreduce等。我们这里介绍一些实际开发中比较常用的操作符。

map

map操作符用于单纯的处理数据,它的参数是一个Function对象,用于对Flux序列中的元素进行逐一处理,所有的返回值生成一个新的序列。

Flux<String> flux = Flux
        .fromArray(new String[]{"data1", "data2", "data3"})
        .map((s) -> {
            return s.toUpperCase();
        });

上面代码我们把所有的字符串转成了大写。这种写法其实和传统的遍历、修改效果上相同,但在Reactive编程中我们要操作的元素通常处于反应式流中,我们无法将元素消费出来操作,因此需要使用map

flatmap

flatmap用于调用异步操作对数据进行处理,和map不同的是flatmapFunction参数返回值必须是org.reactivestreams.Publisher的实现。说白了就是flatmap里面必须也是一个返回Mono或Flux的异步操作,flatmap通常是用来组合多个异步操作的。

Flux<String> flux = Flux
        .fromArray(new String[]{"1", "2", "3"})
        .flatMap((s) -> {
            DemoAsyncTask demoAsyncTask = new DemoAsyncTask();
            return demoAsyncTask.queryDataById(s);
        });

上面代码中的demoAsyncTask.queryDataById()就是一个返回Mono的异步操作,对于flux序列中是3个元素会并发执行。如果我们调用的异步操作没有返回Mono而是采用CompletableFuture或是更传统的基于接口的异步回调函数写法,我们也可以使用Mono.fromFuture()或是Mono.create()进行封装。

reduce

reduce操作可以将Flux聚合成Mono,下面是一个例子。

Mono<Integer> mono = Flux
        .fromArray(new Integer[]{1, 2, 3})
        .reduce(0, (data, i) -> {
            return data + i;
        });

reduce可能乍一看比较复杂,其方法签名如下:

public final <A> Mono<A> reduce(A initial, BiFunction<A, ? super T, A> accumulator);

其中,A是初始值也是最终返回值的类型,BiFunction中第1个参数是上一次聚合的结果(首次为初始值),第2个参数是集合中的每一个元素。

zip

zip可以将多个异步操作合并成一个操作,下面是其用法。

DemoAsyncTask demoAsyncTask = new DemoAsyncTask();
Mono<String> mono = Mono
        .zip(demoAsyncTask.queryDataById("1"), demoAsyncTask.queryDataById("2"), demoAsyncTask.queryDataById("3"))
        .map((tuple3) -> {
            return tuple3.getT1() + tuple3.getT2() + tuple3.getT3();
        });

代码中,我们调用了3次demoAsyncTask.queryDataById()异步操作,这些异步操作会返回Mono<String>,而我们使用zip将3个操作组合成了1个Mono<Tuple3<T1, T2, T3>>,最终使用map收集结果并转换为了1个Mono<String>zip操作提供了很多重载,包括的Tuple2Tuple8能够支持最多8个异步操作,当然更多也不是不行,只是没有类型安全的TupleN可以使用了,实际开发中也很少出现这种情况。

作者:Gacfox
版权声明:本网站为非盈利性质,文章如非特殊说明均为原创,版权遵循知识共享协议CC BY-NC-ND 4.0进行授权,转载必须署名,禁止用于商业目的或演绎修改后转载。
Copyright © 2017-2024 Gacfox All Rights Reserved.
Build with NextJS | Sitemap