FlowAPI 响应式流

JDK9中引入了FlowAPI,其包含了PublisherSubscriberSubscriptionProcessor4个核心接口,同时内置了一个简单的实现SubmissionPublisher。FlowAPI是实现Reactive Stream(响应式流)的标准规范。相比传统的多线程模型,异步响应式流模型编程更为简单且不易出错,更容易写出高性能的代码。

Reactive Stream广泛用于客户端的GUI消息处理、异步接口调用等,服务端则主要用在高性能网关等场景。具体使用时,FlowAPI自带的SubmissionPublisher功能比较单薄,我们一般还是配合RxJavaReactorCore等库进行使用,这些库的新版本也已经适配了FlowAPI。

有关什么是Reactive Stream还可以参考RxJava相关章节,RxJava是一个功能完整的异步响应式流编程框架,其出现早于JDK9的FlowAPI,但本质上是描述的同一个东西。

基本概念

这里我们先了解一下FlowAPI中的基本概念。

Publisher:发布者,用于发布消息。发布者分为multicast(多播)和unicast(单播)两种,前者发布的1条消息所有订阅者都会收到,后者则只有一个订阅者会收到,JDK9自带的SubmissionPublisher只支持多播。

Subscriber:订阅者,用于接收消息并进行相关的业务逻辑。

Subscription:发布订阅关系对象,建立订阅关系后由框架返回。

Processor:可选的处理器对象,如图所示它位于发布者和订阅者之间,处理器从发布者中接收消息,进行相应处理后在发给订阅者,因此实际上处理器既是订阅者也是发布者,承担了一个流处理和转换的角色。

Back Pressure:背压,这是一种限制发布者发布消息数量的机制。在发布订阅模型中,发布者发出的消息通常会写入缓冲区并由订阅者逐条处理,但如果发布者发布消息过快就会造成系统资源耗尽。背压机制则实现了当缓冲区已满后,发布者暂停发布消息,当消息被消费后再继续发布,因此能够避免资源耗尽的情况出现。

发布订阅

下面例子代码中,我们使用了JDK9自带的SubmissionPublisher实现了一个响应式编程的例子。代码中,我们的发布者不断发布字符串消息,订阅者则负责消费这些消息,并打印出来。

ReactiveSubscriber.java

package com.gacfox.demo;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.Flow;

/**
 * 订阅者例子
 */
@Slf4j
public class ReactiveSubscriber implements Flow.Subscriber<String> {
    /**
     * 订阅关系对象
     */
    private Flow.Subscription subscription;

    /**
     * 建立订阅关系时回调
     *
     * @param subscription 发布订阅关系对象
     */
    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        log.info("建立订阅关系");
        this.subscription = subscription;
        this.subscription.request(1);
    }

    /**
     * 订阅者每次处理数据时回调
     *
     * @param item 要处理的数据
     */
    @Override
    public void onNext(String item) {
        // 这里为了观察效果,手动让线程睡眠1秒
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        log.info("接收数据:{}", item);
        // 数据处理完后,再次请求1条数据
        this.subscription.request(1);
    }

    /**
     * 发生异常时回调
     *
     * @param throwable 异常对象
     */
    @Override
    public void onError(Throwable throwable) {
        log.info("程序出错");
        this.subscription.cancel();
    }

    /**
     * 数据全部处理完成后回调
     */
    @Override
    public void onComplete() {
        log.info("程序完成");
    }
}

上面代码我们使用了Flow.Subscriber接口,它具有4个方法需要实现,最重要的就是onNext方法,其中包含我们的业务逻辑,这里我们为了观察效果手动让线程睡眠1秒,模拟消息消费速度低于发布速度的场景。注意onNext方法最后我们调用了subscription.request()方法,该方法表示请求到指定条消息后再继续处理。

其余具体见代码注释,这里就不赘述了。

Main.java

package com.gacfox.demo;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;

@Slf4j
public class Main {
    public static void main(String[] args) {
        // 创建发布者
        try (SubmissionPublisher<String> publisher = new SubmissionPublisher<>()) {
            // 创建订阅者
            Flow.Subscriber<String> subscriber = new ReactiveSubscriber();
            // 建立发布订阅关系
            publisher.subscribe(subscriber);

            // 创建订阅者
            Flow.Subscriber<String> subscriber2 = new ReactiveSubscriber();
            // 建立发布订阅关系
            publisher.subscribe(subscriber2);

            // 发布数据
            for (int i = 0; i < 1000; i++) {
                publisher.submit("data" + i);
                log.info("发送消息" + "data" + i);
            }

            // 发布者发布结束
            publisher.close();

            // 阻塞主线程防止程序退出
            Thread.currentThread().join();

        } catch (Exception e) {
            log.error("error: ", e);
        }
    }
}

上面代码中,我们先实例化了JDK9自带的SubmissionPublisher,注意该对象实现了AutoCloseable接口,因此这里我们使用了try-with-resource写法;然后又创建了我们自己编写的ReactiveSubscriber对象,并建立了订阅关系,发布者陆续发布1000条消息后,结束发布。

实际运行时,我们会发现发布者在发布256条消息后就阻塞了,因为此时发布速度远大于消费速度,缓冲区已满,这就是之前所说的Back Pressure机制。最后随着消费完成后,订阅者的onComplete()方法被回调。

处理器

FlowAPI中,如果我们需要在发布者和订阅者之间做一些转换操作,就需要用到Processor处理器,这个和JDK8中StreamAPI的流数据处理思想有些类似。Processor既是订阅者也是发布者,承担了一个流处理和转换的角色,下面例子中我们实现了一个简单的处理器,将字符串消息转化为了大写字母。

ReactiveProcessor.java

package com.gacfox.demo;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;

@Slf4j
public class ReactiveProcessor extends SubmissionPublisher<String> implements Flow.Processor<String, String> {

    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        log.info("建立订阅关系");
        this.subscription = subscription;
        this.subscription.request(1);
    }

    @Override
    public void onNext(String item) {
        log.info("处理数据:{}", item);
        this.submit(item.toUpperCase());
        this.subscription.request(1);
    }

    @Override
    public void onError(Throwable throwable) {
        log.info("程序出错");
        this.subscription.cancel();
    }

    @Override
    public void onComplete() {
        log.info("程序完成");
    }
}

Main.java

// 创建发布者和处理器
try (SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
        ReactiveProcessor processor = new ReactiveProcessor()) {
    // 创建订阅者
    Flow.Subscriber<String> subscriber = new ReactiveSubscriber();
    // 建立发布订阅关系
    publisher.subscribe(processor);
    processor.subscribe(subscriber);

    // 后续发布消息等业务逻辑...

}

代码中,我们的处理器订阅了发布者,而订阅者订阅了处理器,这个很容易理解,这里就不过多解释了。最终运行时,我们会发现订阅者收到的数据已经被转换为了大写字母。

作者:Gacfox
版权声明:本网站为非盈利性质,文章如非特殊说明均为原创,版权遵循知识共享协议CC BY-NC-ND 4.0进行授权,转载必须署名,禁止用于商业目的或演绎修改后转载。