响应式流规范(Reactive Streams)是Java中的用于处理异步数据流的标准规范,该规范在不同的响应式库之间提供了一种标准化的异步流处理方式。ReactorCore是响应式流规范的一种实现,ReactorCore库也是Spring生态中集成的响应式编程框架,我们熟知的SpringWebFlux、Spring Cloud Gateway都是基于ReactorCore实现的,掌握ReactorCore的使用对理解这些组件至关重要。
本系列笔记我们将对ReactorCore的使用进行学习,不过这里我们不打算立刻引入SpringWebFlux,我们将创建一个普通Java工程并从响应式流规范和ReactorCore的基础概念开始介绍。
在学习ReactorCore之前我们必须先明白什么是响应式编程。响应式编程(Reactive Programming,也经常被翻译为“反应式”编程)是一种面向数据流和变化传播的编程范式。区别于我们最常用的“命令式”编程,响应式编程具有以下特点:
传统的“命令式”编程难道无法实现非阻塞并发逻辑吗?当然并不是这样!Java中,我们使用传统的流程控制语句配合多线程技术也可以实现任何我们想要的代码逻辑,包括复杂精细的并发流程控制,只不过这样写出来的代码可能非常复杂,与同样复杂的业务逻辑耦合后会更加难以维护和调试,这种代码通常被称为“失控”的代码,而响应式编程提出了一个相对“规范”的编程模型,简单来讲就是它并没简化业务逻辑而是换了种代码的写法,我们按照响应式编程的写法编写代码能够最大程度避免出现代码失控后续无人敢维护的情况。
总而言之,响应式编程是一种面向数据流驱动的声明式编程范式,常用于实时和高并发的IO密集型应用场景。响应式编程最早是由微软在.NET中实现的Rx.NET框架,后来这一概念也扩展到其它编程语言的生态中,衍生出了RxJS、RxJava、ReactorCore等框架。
响应式编程是一种编程范式,它没有具体规定要怎么实现,因此各种框架都给出了不尽相同的实现方式,而响应式流规范(Reactive Streams)则是后来从其中提炼出的一套标准的Java接口,响应式流规范和响应式编程两个概念并不完全等同。RxJava、ReactorCore等框架都早于响应式流规范出现,但响应式流规范提出后,RxJava、ReactorCore等框架的新版本都实现了该规范,使得我们可以用标准化的接口来使用这些框架。
前面我们介绍过,响应式编程是声明式的,我们需要组装一系列的组件来构建数据流的处理管线,Reactive Streams中就有下面这样的一些组件。
发布者(Publisher):Publisher是数据流的生产者,Publisher负责创建Subscription并生产数据流给Subscriber,这个数据流可能是无限的也可能是有限的。此外,Reactive Streams里数据的推送是按需的,只有Subscriber建立订阅关系并请求数据后才会生产数据,数据发送完成后会调用Subscriber的onComplete()
方法完成数据流的发送。
public interface Publisher<T> {
/**
* 该方法用于建立Publisher和Subscriber之间的订阅关系
* 这其实也是Subscription的工厂方法,每次调用时,Subscription被创建并传入Subscriber
* 每个Subscription只会用于一个Subscriber,一个Subscriber只应该订阅单个Publisher一次
* Publisher如果拒绝将抛出Throwable错误
*
* @param s 消费者实例
*/
public void subscribe(Subscriber<? super T> s);
}
订阅者(Subscriber):Subscriber负责消费数据并执行相应的业务逻辑对数据进行处理,Reactive Streams中,Subscriber必须先和Publisher建立订阅关系并调用Subscription的request(n)
来请求n
个数据整个生产消费流程才会启动,订阅者可以主动控制消费速率、丢弃或缓冲过多的数据来避免数据过载,这也被称为Reactive Streams中的背压(Backpressure)机制。数据流的完成信号是Publisher发出的,Subscriber只负责提供处理完成或错误的回调函数,不能主动“完成”数据流,但Subscriber可以调用Subscription的cancel()
方法取消订阅。
public interface Subscriber<T> {
/**
* 该方法调用Publisher的subscribe方法后会回调
* 回调后数据并不会主动送数据到Subscriber,通常这个方法内还需要调用Subscription的request()方法开始请求数据
*
* @param s 订阅关系实例
*/
public void onSubscribe(Subscription s);
/**
* Subscriber请求数据后,Publisher通过回调该方法向Subscriber发送数据
*
* @param t Subscriber接收到的数据
*/
public void onNext(T t);
/**
* 数据流发送中出现异常,此时数据流终止,调用Subscription的request()方法也不会再发送数据了
*
* @param t 异常对象
*/
public void onError(Throwable t);
/**
* 数据流发送完成,和onError()类似,调用Subscription的request()方法也不会再发送数据了
*/
public void onComplete();
}
订阅关系(Subscription):Subscription是订阅关系对象,它连接了Publisher和Subscriber两个组件。
public interface Subscription {
/**
* Subscriber调用该方法从Publisher请求n个数据(即允许Publisher最多发送n个数据)
*
* @param n 请求的数据个数
*/
public void request(long n);
/**
* Subscriber调用该方法取消订阅关系
*/
public void cancel();
}
处理器(Processor):Processor对象位于发布者和订阅者之间,Processor从发布者中接收消息,进行相应处理后再发给订阅者,因此实际上Processor既是订阅者也是发布者,承担了一个流处理和转换的角色。Processor是可选的,它在响应式流中不是必须存在的。此外,后面我们介绍的ReactorCore中由于提供了大量流操作符,这些API封装并替代了Processor的功能,因此Reactor中较少直接操作Processor。
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
ReactorCore框架本身提供了许多API,但这些API都是上面4个接口的封装和抽象,我们掌握上面4个Reactive Streams的核心接口后就可以尝试使用ReactorCore了。
我们创建一个普通Java工程并引入如下Maven依赖即可使用ReactorCore。
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.4.34</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<version>3.4.34</version>
</dependency>
注意:虽然Reactive Streams规范是在Java9中实现为FlowAPI的,ReactorCore实现了Reactive Streams但ReactorCore也支持Java8版本,我们可以直接使用ReactorCore中的接口编写代码逻辑,因此使用Java8并不影响我们使用ReactorCore。
Reactive Streams理解起来其实是非常抽象的,我们这里直接以例子的形式介绍如何实现最简单的响应式流编程。我们实现这样一个简单的功能,Publisher生产Message 1
、Message 2
这样的消息,Subscriber订阅并接收消息,每次接收消息后都请求接下来的n个数据,Subscriber收到消息后打印到控制台上,直到数据流发送完成,结束整个处理流程。
DemoPublisher.java
package com.gacfox.demo;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
public class DemoPublisher implements Publisher<String> {
private final Queue<String> queue;
private boolean canceled = false;
private boolean completed = false;
class DemoSubscription implements Subscription {
private final Subscriber<? super String> subscriber;
public DemoSubscription(Subscriber<? super String> subscriber) {
this.subscriber = subscriber;
}
@Override
public void request(long n) {
for (int i = 0; i < n; i++) {
if (completed || canceled) return;
if (queue.isEmpty()) {
completed = true;
subscriber.onComplete();
return;
}
subscriber.onNext("Message " + queue.poll());
}
}
@Override
public void cancel() {
canceled = true;
}
}
public DemoPublisher(List<String> data) {
// 在队列中模拟插入一些数据
queue = new LinkedList<>();
queue.addAll(data);
}
@Override
public void subscribe(Subscriber<? super String> subscriber) {
DemoSubscription demoSubscription = new DemoSubscription(subscriber);
subscriber.onSubscribe(demoSubscription);
}
}
Publisher中subscribe()
方法主要实例化了内部的队列和Subscription,其中Subscription包含数据的生产逻辑,每次request()
方法被调用后将顺序的从队列中取出数据并生成Message 1
、Message 2
、Message <count>
字符串信息,当队列为空时,数据消费完成,completed
标志将被设置为true
,停止发送数据;如果调用了cancel()
取消订阅,则canceled
标志被设置为true
,也会停止发送数据。
DemoSubscriber.java
package com.gacfox.demo;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
public class DemoSubscriber implements Subscriber<String> {
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(String s) {
System.out.println("Received: " + s);
subscription.request(5);
}
@Override
public void onError(Throwable t) {
System.out.println("Error: " + t.getMessage());
}
@Override
public void onComplete() {
System.out.println("Completed!");
}
}
Subscriber中包含的4个方法都是回调方法,onSubscribe()
在订阅后回调,它的参数是Subscription实例,我们需要用它的request()
方法请求数据,方法内我们在订阅后立即请求了1条数据,onNext()
则处理了数据被发送到Subscriber后的处理逻辑,这里只是简单的将其打印到控制台上,并请求下5条数据。至于onError()
和onComplete()
回调方法则是用于处理数据流出错和数据流发送完成的逻辑,这里也是简单的打印信息。
Main.java
package com.gacfox.demo;
import java.util.Arrays;
public class Main {
public static void main(String[] args) {
DemoPublisher publisher = new DemoPublisher(Arrays.asList("1", "2", "3", "4", "5", "6", "7", "8", "9", "10"));
DemoSubscriber subscriber = new DemoSubscriber();
publisher.subscribe(subscriber);
}
}
主函数中,我们实例化了Publisher和Subscriber,并调用了Publisher的subscribe()
方法实现订阅,数据流我们采用了一个固定的字符串列表。前面代码中,一旦订阅后Subscriber的onSubscribe()
就会回调,这个函数请求了第1条数据,数据发送到Subscriber后onNext()
会被回调,处理数据并请求下5条数据,这样整个数据流便启动了,直到数据流发送完成。
运行程序后,控制台输出信息如下。
Received: Message 1
Received: Message 2
Received: Message 3
Received: Message 4
Received: Message 5
Received: Message 6
Received: Message 7
Received: Message 8
Received: Message 9
Received: Message 10
Completed!
前面例子中,我们的程序里Subscriber的onComplete()
方法会在数据流发送完成时被回调,但如果是数据流出错的情况,我们需要回调Subscriber的onError()
函数,它需要传入一个Throwable
类型的错误对象,下面是一个例子。
@Override
public void request(long n) {
try {
for (int i = 0; i < n; i++) {
if (completed || canceled) return;
if (queue.isEmpty()) {
completed = true;
subscriber.onComplete();
return;
}
String s = queue.poll();
if ("5".equals(s)) {
// 假设这里因某种原因抛出了异常
throw new RuntimeException("Exception thrown!");
}
subscriber.onNext("Message " + s);
}
} catch (Exception e) {
// 这里捕获了异常,结束数据流,并回调subscriber.onError(e)
completed = true;
subscriber.onError(e);
}
}
这个数据流会在值等于5时假装“出错”,错误最终会被catch
子句捕获,此时我们同样需要停止数据流,因此设置completed
为true
停止发送数据,随后我们回调了Subscriber的onError()
函数并传入了错误对象。
前面例子中我们的Publisher比较简单,虽然代码实现了发布订阅模式,但它仍是单线程的。实际上,使用Reactive Streams能够很好的实现支持背压的异步并发编程,但涉及并发实现起来就相对复杂很多了,但我们还是要记住,无论在底层如何调度执行任务的线程,仍要遵循Reactive Streams的编程范式,即request()
请求数据,onNext()
、onComplete()
需要按照正确的先后顺序调用,这一点不能变。
下面例子中,我们的Publisher支持通过submit()
方法提交数据以及通过close()
方法关闭数据流,它使用线程池提供的线程资源异步的执行任务。
package com.gacfox.demo;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
public class DemoPublisher implements Publisher<String> {
private final ExecutorService executorService;
private final ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
private final AtomicBoolean closed = new AtomicBoolean(false);
private DemoSubscription subscription;
public DemoPublisher(ExecutorService executorService) {
this.executorService = executorService;
}
@Override
public void subscribe(Subscriber<? super String> subscriber) {
this.subscription = new DemoSubscription(subscriber, executorService);
subscriber.onSubscribe(subscription);
}
public void submit(String item) {
if (closed.get()) {
throw new IllegalStateException("Publisher is closed");
}
queue.offer(item);
subscription.drain();
}
public void close() {
if (closed.compareAndSet(false, true)) {
subscription.drain();
}
}
private class DemoSubscription implements Subscription {
private final Subscriber<? super String> subscriber;
private final ExecutorService executorService;
private final AtomicLong requested = new AtomicLong(0);
private final AtomicBoolean cancelled = new AtomicBoolean(false);
private final AtomicBoolean draining = new AtomicBoolean(false);
DemoSubscription(Subscriber<? super String> subscriber, ExecutorService executorService) {
this.subscriber = subscriber;
this.executorService = executorService;
}
@Override
public void request(long n) {
requested.addAndGet(n);
drain();
}
@Override
public void cancel() {
if (cancelled.compareAndSet(false, true)) {
requested.set(0);
queue.clear();
}
}
void drain() {
if (cancelled.get() || draining.get() || executorService == null) {
return;
}
if (draining.compareAndSet(false, true)) {
executorService.execute(() -> {
try {
while (requested.get() > 0) {
String item = queue.poll();
if (item == null) {
break;
}
try {
this.subscriber.onNext(item);
} catch (Throwable t) {
subscriber.onError(t);
cancel();
break;
}
requested.decrementAndGet();
}
// 检查流是否关闭,如果关闭则调用onComplete()
if (closed.get() && queue.isEmpty() && !cancelled.get()) {
subscriber.onComplete();
cancel();
}
} finally {
draining.set(false);
}
});
}
}
}
}
DemoPublisher
代码中,我们使用了一个ConcurrentLinkedQueue
来存储提交的数据,一旦数据流需要关闭,closed
标志会被设置为true
,数据流也会随之关闭。
DemoSubscription
中,requested
用来记录当前请求的数据量,cancelled
会在取消订阅时被设置为true
。draining
标志非常重要,它表示数据流是否正在处理中,我们的drain()
函数代码中通过CAS检查if (draining.compareAndSet(false, true)) { }
保证只有一个线程处理数据流,如果已经有线程在处理数据,其他线程会直接返回。
package com.gacfox.demo;
import java.util.concurrent.*;
public class Main {
public static void main(String[] args) {
// 用于执行任务的线程池
ExecutorService executorService = new ThreadPoolExecutor(
10,
30,
60L, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(100),
Executors.defaultThreadFactory()
);
// 创建Publisher、Subscriber并建立订阅关系
DemoPublisher publisher = new DemoPublisher(executorService);
DemoSubscriber subscriber = new DemoSubscriber();
publisher.subscribe(subscriber);
// 向Publisher提交数据
for (int i = 1; i <= 100; i++) {
publisher.submit(String.valueOf(i));
}
// 关闭Publisher
publisher.close();
}
}
Main
类中,我们首先创建了线程池,然后创建Publisher、Subscriber并建立订阅关系,随后我们向Publisher提交数据,最后关闭了Publisher。如果一切正常,我们可以看到Subscriber在一个不同于主线程的线程中处理了100条数据,并打印了完成的提示。
前面代码我们实现了自己的Publisher和Subscriber,这些Demo代码写的其实比较简陋,它们缺乏参数检查而且功能十分有限,在使用ReactorCore框架时自己实现Publisher和Subscriber在ReactorCore框架中也不是推荐的方式,这里仅仅是为了演示Reactive Streams底层的代码逻辑,我们了解其原理即可。ReactorCore围绕Flux和Mono这两个核心类提供了大量的API来声明式的创建发布和订阅逻辑,我们将在后续章节详细介绍Flux和Mono的使用。