SpringWebFlux简介
SpringWebFlux是Spring5中引入的一套采用异步非阻塞模型的Web服务端框架。SpringWebFlux基于ReactorCore响应式编程框架实现,它能够基于Netty运行也能够基于异步Servlet容器运行,实现高并发服务端程序。SpringCloud生态中的SpringCloudGateWay微服务网关也是基于SpringWebFlux实现的。
这篇笔记我们将基于SpringBoot工程介绍SpringWebFlux框架的使用,注意在学习这些内容前,我们需要先掌握Spring框架相关内容以及ReactorCore的使用,否则难以用好SpringWebFlux。
SpringWebFlux中的基本概念
多线程模型 vs 异步非阻塞模型
对于Web服务端开发来说,并发模型主要有传统的多线程模型(Request per Thread)和后来逐渐流行起来的异步非阻塞模型。基于传统JavaEE Servlet API的框架(例如Struts2、SpringMVC等)都属于前者,Tomcat、Undertow等Servlet容器提供了基于线程池的多线程并发支持,每个请求对应于一个HTTP处理线程,开发中具体业务代码通常采用同步阻塞式的编程方式开发,其优点是使用简单、开发调试方便,是开发服务端程序的主流方式。然而,一旦我们的服务端程序涉及高并发、长连接、高实时性要求,这种并发模型就不太够用了。
我们知道线程是操作系统提供的相对“重量级”的一种资源,JVM中的线程也对应于操作系统的线程,服务端采用多线程模型时,HTTP处理线程经常需要阻塞等待网络或磁盘IO等操作,而单纯的增加线程池的线程数也不可取,因为创建更多线程就要消耗更多内存,这种并发模型对线程资源的利用不充分,同时也会给线程调度带来更大压力。
异步非阻塞并发模型天生适合IO密集型的高并发服务端程序编写,在这种并发模型下业务逻辑被更细粒度的拆分和调度,因此能够充分利用操作系统的线程资源,以更少的内存换取更大的并发数,非常适合用来实现高性能网关、接口的聚合层这类组件。当然,它的缺点是代码扭曲,难写、难调试。
此外,我们还要了解的一点是如果使用SpringWebFlux就必须(或者说尽最大可能的)整个工程都是Reactive的,千万不能再用阻塞式API,例如传统的JDBC、MyBatis、阻塞式的Redis客户端等。这类同步式的API在SpringWebFlux中很容易被误用,导致绝对不应被阻塞的线程被错误的阻塞了,造成服务端性能的急剧下降。Spring实际上提供了整套的Reactive方案供我们配合SpringWebFlux使用,包括基于异步模型的HTTP客户端WebClient,异步的关系型数据库访问框架R2DBC,非阻塞的SpringData Redis Reactive等。如果我们的一个工程打算采用SpringWebFlux,那么就需要全套的Reactive方案。

SpringWebFlux和SpringMVC对比
SpringWebFlux并不是一个可以替代SpringMVC的方案,它是一个有着明确适用场景的框架。SpringWebFlux的优点是能够实现高并发服务,在同样的硬件条件和副本数下能够提供比SpringMVC更高的连接数,此外结合采用了异步并发模型的HTTP客户端、数据库访问框架等,能够使我们的服务整体上就是一个高并发的架构。然而,SpringWebFlux的缺点也不能忽视。
- 使用SpringWebFlux就必须整个工程都使用异步编程模型来开发,传统的同步式IO、HTTP客户端、JDBC、以及一些其它同步阻塞模式的API都不可以使用。
- 响应式编程写出的代码是“扭曲”的,它不同于传统的同步阻塞式代码,对一些新手来说理解并掌握响应式编程不是一件容易的事情,尤其是要编写一些复杂业务逻辑的时候,它具有很高的使用门槛。
- 不同于同步阻塞式代码所有操作都在一个线程里,异步并发模型编写的代码难以调试。
- Java生态中有大量依赖于同步阻塞式模型的组件,比如很多日志框架中都有的基于
ThreadLocal的MDC,如果放到异步模型下这些组件就无法直接使用了。
什么时候需要SpringWebFlux
使用SpringWebFlux开发时心智负担非常高,调试也非常困难。如果你的情况是听说SpringWebFlux具有“更好的性能”,于是决定在下一个业务场景中使用它,那大概率是个巨大的错误。SpringWebFlux的应用领域明确且狭窄,它适用于以下场景:
- 高并发API网关:SpringWebFlux非常适合作为微服务的接口聚合层,例如SpringCloudGateway就是SpringWebFlux实现的
- 高频实时消息推送:这类应用需要大量客户端能与服务端维持长连接,存在大量高频且要求实时性的小数据包交互,例如实时交易信息推送、监控大屏、IoT网关、游戏服务端等
- 实时流媒体分发服务:这类应用场景有持续的数据流传输,具有低延迟、高吞吐的要求
总而言之,SpringWebFlux适合场景的特点是高并发、低延迟和简单业务逻辑。如果你要开发的程序不在此列,在对其不甚了解的情况下“误用”了SpringWebFlux,那就是件痛苦和悲伤的事情了。你会发现复杂的业务逻辑难以表达,代码可读性极低,甚至因为错误的使用了SpringWebFlux,导致程序的并发性能也极低,最后陷入难以收拾的局面。
使用SpringWebFlux
这里我们基于SpringBoot工程,使用SpringWebFlux、R2DBC和MySQL数据库编写一个简单的数据查询例子,关于R2DBC的使用我们将在单独的章节介绍,这里不过多解释,此外这个例子假设你已经非常熟悉SpringBoot,并能正确使用ReactorCore库操作响应式数据流,否则可能难以看懂。
引入Maven依赖
首先我们需要引入SpringWebFlux依赖。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
此外由于我们还需要操作数据,我们这里选择使用R2DBC,因此需要引入R2DBC依赖和MySQL的Reactive驱动。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
<dependency>
<groupId>dev.miku</groupId>
<artifactId>r2dbc-mysql</artifactId>
<version>0.8.2.RELEASE</version>
</dependency>
添加数据库配置
我们在application.properties中添加MySQL数据库的连接信息。
spring.r2dbc.url=r2dbc:mysql://127.0.0.1:3306/netstore?characterEncoding=UTF-8&serverTimezone=GMT%2B8
spring.r2dbc.username=root
spring.r2dbc.password=root
编写SpringWebFlux代码
SpringWebFlux工程的目录结构组织上可以沿用SpringMVC的方式,我们的代码目录结构大致如下。
demo-webflux
|_ src/main/java
|_ com.gacfox.demo
|_ model
|_ ApiResult.java # 通用响应体封装类
|_ Customer.java # 客户实体类
|_ CustomerDto.java # 客户DTO类
|_ controller
|_ CustomerController.java # 控制器
|_ service
|_ CustomerService.java # 服务层Bean
|_ repository
|_ CustomerRepository.java # R2DBC数据查询接口
|_ DemoWebfluxApplication.java # 启动类
|_ src/main/resources
|_ application.properties # SpringBoot配置文件
Customer.java
package com.gacfox.demo.model;
import lombok.Data;
import org.springframework.data.annotation.Id;
import org.springframework.data.relational.core.mapping.Column;
import org.springframework.data.relational.core.mapping.Table;
@Data
@Table("t_customer")
public class Customer {
@Id
@Column("id")
private Long id;
@Column("name")
private String name;
@Column("phone")
private String phone;
}
客户实体类中,我们使用R2DBC的注解标注了实体类的表名、主键和字段名。和SpringDataJPA不同,R2DBC不能使用JPA注解,我们需要用SpringDataJDBC的注解。
CustomerRepository.java
package com.gacfox.demo.repository;
import com.gacfox.demo.model.Customer;
import org.springframework.data.r2dbc.repository.R2dbcRepository;
public interface CustomerRepository extends R2dbcRepository<Customer, Long> {
}
Repository接口中,我们继承了R2dbcRepository<Customer, Long>接口,它会自动映射许多数据表查询的实用方法。注意SpringWebFlux中一切都是Reactive的数据流,因此返回值只能是Mono或Flux,例如查询全部数据的findAll()方法返回的就是Flux。
CustomerService.java
package com.gacfox.demo.service;
import com.gacfox.demo.model.CustomerDto;
import com.gacfox.demo.repository.CustomerRepository;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import javax.annotation.Resource;
@Service("customerService")
public class CustomerService {
@Resource
private CustomerRepository customerRepository;
public Flux<CustomerDto> queryCustomerList() {
return customerRepository.findAll()
.map((o) -> CustomerDto.builder()
.id(o.getId())
.name(o.getName())
.phone(o.getPhone())
.build());
}
}
在Service层中,我们调用了Repository查询了数据,它的返回值类型是实体类的Flux。不过直接将实体类对象返回给Controller层不是一种好的开发方式,实际开发中我们在Service层返回的都是DTO,因此这里我们使用map操作符将实体对象映射为DTO对象,最终再返回给上一层。
CustomerController.java
package com.gacfox.demo.controller;
import com.gacfox.demo.model.ApiResult;
import com.gacfox.demo.model.CustomerDto;
import com.gacfox.demo.service.CustomerService;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Mono;
import javax.annotation.Resource;
import java.util.List;
@RestController
@RequestMapping("/api/v1/customer")
public class CustomerController {
@Resource
private CustomerService customerService;
@GetMapping("/queryCustomerList")
public Mono<ApiResult<List<CustomerDto>>> queryCustomerList() {
return customerService.queryCustomerList()
.collectList()
.map(ApiResult::success);
}
}
Controller层中,我们调用了Service层查询数据,但这里我们还想将返回数据封装到通用响应体ApiResult类内,它是单个对象,因此我们需要使用reduce操作符将Flux聚合为Mono,不过ReactorCore提供了collectList操作符,我们实际不需要自己reduce,调用collectList()后数据流内的数据将被转为List列表,最后我们调用map操作符将List封装到ApiResult。此时我们已实现了一个典型的SpringWebFlux程序。
配置BlockHound
BlockHound是Project Reactor提供的一款工具,它可以检测到我们程序代码中对反应式写法和阻塞式写法之间的混淆和误用。由于正确使用Reactive反应式编程是极其困难的,如果你的团队刚刚接触这种开发范式,建议一定要配置BlockHound,避免团队成员大量编写错误代码,最终服务性能急剧下降却又积重难返无法修改的情况出现。
在Maven的pom.xml中引入如下依赖。
<dependency>
<groupId>io.projectreactor.tools</groupId>
<artifactId>blockhound</artifactId>
<version>1.0.11.RELEASE</version>
</dependency>
配置BlockHound最简单的方法是在SpringBoot的启动类中调用install()方法,它在全局范围内开启了BlockHound的检测功能。如果不想全局启用仅想在某些包名下启用,BlockHound也是支持配置的,具体可以参考相关文档。
package com.gacfox.demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import reactor.blockhound.BlockHound;
@SpringBootApplication
public class DemoWebfluxApplication {
public static void main(String[] args) {
String profile = System.getProperty("spring.profiles.active");
if ("dev".equals(profile)) {
BlockHound.install();
}
SpringApplication.run(DemoWebfluxApplication.class, args);
}
}
代码中我们仅在SpringBoot的环境为dev下开启BlockHound(不要在生产环境中使用),此时我们编写的阻塞式操作都会被BlockHound发现并抛出像下面这样的异常信息。
reactor.blockhound.BlockingOperationError: Blocking call! java.lang.Thread.sleep
关于SpringMVC和SpringWebFlux共存的情况
实际上,当前版本的SpringMVC和SpringWebFlux可以共存,如下配置在SpringBoot2.7下是可以正常启动的,在该配置下服务默认使用Tomcat运行,SpringWebFlux此时会以的兼容方式运行在Servlet3.1+的异步模式下,此时你能在服务端代码中同时使用普通的阻塞式编程模型和Reactive模型。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
虽然这种做法在技术上是可以的,但目前普遍认为这并非是一个好的方案,在Tomcat中SpringWebFlux并非如Netty那样真正基于事件循环实现的非阻塞IO,可能无法充分发挥SpringWebFlux的优势,反而带来了Reactive编程的额外复杂性,同时使用两种编程模型时杂糅的异步适配会增加代码维护难度,开发成本更高。如果你已决定基于SpringWebFlux开发,强烈建议使用Netty或其它真正基于事件驱动的服务器框架来实现,这样才能真正发挥SpringWebFlux设计上的优点。
如何确定当前应用程序是运行在Tomcat上的还是Netty上的?当你启动服务时看到类似以下输出时,它是运行在Tomcat上的。
Tomcat started on port(s): 8080 (http) with context path ''
如果输出以下内容,则是运行在Netty上的。
Netty started on port 8080