FlowAPI 响应式流
JDK9中引入了FlowAPI,其包含了Publisher、Subscriber、Subscription、Processor4个核心接口,同时内置了一个简单的实现SubmissionPublisher。FlowAPI是实现Reactive Stream(响应式流)的标准规范。相比传统的多线程模型,异步响应式流模型编程更为简单且不易出错,更容易写出高性能的代码。
Reactive Stream广泛用于客户端的GUI消息处理、异步接口调用等,服务端则主要用在高性能网关等场景。具体使用时,FlowAPI自带的SubmissionPublisher功能比较单薄,我们一般还是配合RxJava、ReactorCore等库进行使用,这些库的新版本也已经适配了FlowAPI。
有关什么是Reactive Stream还可以参考RxJava相关章节,RxJava是一个功能完整的异步响应式流编程框架,其出现早于JDK9的FlowAPI,但本质上是描述的同一个东西。
基本概念
这里我们先了解一下FlowAPI中的基本概念。

Publisher:发布者,用于发布消息。发布者分为multicast(多播)和unicast(单播)两种,前者发布的1条消息所有订阅者都会收到,后者则只有一个订阅者会收到,JDK9自带的SubmissionPublisher只支持多播。
Subscriber:订阅者,用于接收消息并进行相关的业务逻辑。
Subscription:发布订阅关系对象,建立订阅关系后由框架返回。
Processor:可选的处理器对象,如图所示它位于发布者和订阅者之间,处理器从发布者中接收消息,进行相应处理后在发给订阅者,因此实际上处理器既是订阅者也是发布者,承担了一个流处理和转换的角色。
Back Pressure:背压,这是一种限制发布者发布消息数量的机制。在发布订阅模型中,发布者发出的消息通常会写入缓冲区并由订阅者逐条处理,但如果发布者发布消息过快就会造成系统资源耗尽。背压机制则实现了当缓冲区已满后,发布者暂停发布消息,当消息被消费后再继续发布,因此能够避免资源耗尽的情况出现。
发布订阅
下面例子代码中,我们使用了JDK9自带的SubmissionPublisher实现了一个响应式编程的例子。代码中,我们的发布者不断发布字符串消息,订阅者则负责消费这些消息,并打印出来。
ReactiveSubscriber.java
package com.gacfox.demo;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Flow;
/**
* 订阅者例子
*/
@Slf4j
public class ReactiveSubscriber implements Flow.Subscriber<String> {
/**
* 订阅关系对象
*/
private Flow.Subscription subscription;
/**
* 建立订阅关系时回调
*
* @param subscription 发布订阅关系对象
*/
@Override
public void onSubscribe(Flow.Subscription subscription) {
log.info("建立订阅关系");
this.subscription = subscription;
this.subscription.request(1);
}
/**
* 订阅者每次处理数据时回调
*
* @param item 要处理的数据
*/
@Override
public void onNext(String item) {
// 这里为了观察效果,手动让线程睡眠1秒
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.info("接收数据:{}", item);
// 数据处理完后,再次请求1条数据
this.subscription.request(1);
}
/**
* 发生异常时回调
*
* @param throwable 异常对象
*/
@Override
public void onError(Throwable throwable) {
log.info("程序出错");
this.subscription.cancel();
}
/**
* 数据全部处理完成后回调
*/
@Override
public void onComplete() {
log.info("程序完成");
}
}
上面代码我们使用了Flow.Subscriber接口,它具有4个方法需要实现,最重要的就是onNext方法,其中包含我们的业务逻辑,这里我们为了观察效果手动让线程睡眠1秒,模拟消息消费速度低于发布速度的场景。注意onNext方法最后我们调用了subscription.request()方法,该方法表示请求到指定条消息后再继续处理。
其余具体见代码注释,这里就不赘述了。
Main.java
package com.gacfox.demo;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
@Slf4j
public class Main {
public static void main(String[] args) {
// 创建发布者
try (SubmissionPublisher<String> publisher = new SubmissionPublisher<>()) {
// 创建订阅者
Flow.Subscriber<String> subscriber = new ReactiveSubscriber();
// 建立发布订阅关系
publisher.subscribe(subscriber);
// 创建订阅者
Flow.Subscriber<String> subscriber2 = new ReactiveSubscriber();
// 建立发布订阅关系
publisher.subscribe(subscriber2);
// 发布数据
for (int i = 0; i < 1000; i++) {
publisher.submit("data" + i);
log.info("发送消息" + "data" + i);
}
// 发布者发布结束
publisher.close();
// 阻塞主线程防止程序退出
Thread.currentThread().join();
} catch (Exception e) {
log.error("error: ", e);
}
}
}
上面代码中,我们先实例化了JDK9自带的SubmissionPublisher,注意该对象实现了AutoCloseable接口,因此这里我们使用了try-with-resource写法;然后又创建了我们自己编写的ReactiveSubscriber对象,并建立了订阅关系,发布者陆续发布1000条消息后,结束发布。
实际运行时,我们会发现发布者在发布256条消息后就阻塞了,因为此时发布速度远大于消费速度,缓冲区已满,这就是之前所说的Back Pressure机制。最后随着消费完成后,订阅者的onComplete()方法被回调。
处理器
FlowAPI中,如果我们需要在发布者和订阅者之间做一些转换操作,就需要用到Processor处理器,这个和JDK8中StreamAPI的流数据处理思想有些类似。Processor既是订阅者也是发布者,承担了一个流处理和转换的角色,下面例子中我们实现了一个简单的处理器,将字符串消息转化为了大写字母。
ReactiveProcessor.java
package com.gacfox.demo;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
@Slf4j
public class ReactiveProcessor extends SubmissionPublisher<String> implements Flow.Processor<String, String> {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
log.info("建立订阅关系");
this.subscription = subscription;
this.subscription.request(1);
}
@Override
public void onNext(String item) {
log.info("处理数据:{}", item);
this.submit(item.toUpperCase());
this.subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
log.info("程序出错");
this.subscription.cancel();
}
@Override
public void onComplete() {
log.info("程序完成");
}
}
Main.java
// 创建发布者和处理器
try (SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
ReactiveProcessor processor = new ReactiveProcessor()) {
// 创建订阅者
Flow.Subscriber<String> subscriber = new ReactiveSubscriber();
// 建立发布订阅关系
publisher.subscribe(processor);
processor.subscribe(subscriber);
// 后续发布消息等业务逻辑...
}
代码中,我们的处理器订阅了发布者,而订阅者订阅了处理器,这个很容易理解,这里就不过多解释了。最终运行时,我们会发现订阅者收到的数据已经被转换为了大写字母。