SpringWebFlux中提供了WebClient非阻塞HTTP客户端库,它支持异步和流式处理,适用于构建高性能、高并发的应用,是SpringMVC中RestTemplate的替代。这篇笔记我们对SpringWebFlux中WebClient的使用进行简单介绍。
创建WebClient可以使用静态方法create()
或使用该类提供的Builder模式,前者返回包含默认配置的WebClient,后者则可以让我们更精细的控制WebClient的各项配置。
// 默认配置
WebClient client = WebClient.create();
// 自定义配置(推荐)
WebClient client = WebClient.builder()
.baseUrl("http://localhost:8080")
.defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.build();
不过在SpringWebFlux中,我们最好不要每次用WebClient时都重新创建一个实例,我们可以将通用的部分封装为Spring Bean,每次使用时通过依赖注入获取。下面例子代码是一个Java Config配置类,它创建了一个默认connectTimeout、responseTimeout为5秒,基础路径为http://localhost:8080
的WebClient实例,并使用@Bean
注解注册为Spring Bean。
package com.gacfox.demo.config;
import io.netty.channel.ChannelOption;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.netty.http.client.HttpClient;
import java.time.Duration;
@Configuration
public class WebClientConfig {
@Bean
public WebClient webClient(WebClient.Builder builder) {
HttpClient httpClient = HttpClient.create()
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.responseTimeout(Duration.ofMillis(5000));
return builder
.clientConnector(new ReactorClientHttpConnector(httpClient))
.baseUrl("http://localhost:8080")
.defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.build();
}
}
connectTimeout
:建立连接的超时时间responseTimeout
:等待服务器响应的超时时间,即客户端从发送请求到收到响应头的最大时间下面代码演示了如何使用WebClient发起GET请求。我们请求的路径是http://localhost:8080/student/getStudentById?id=1
,它返回查询到的学生信息,我们的控制器方法根据返回值输出查询成功还是失败的消息。
package com.gacfox.demo.controller;
import com.gacfox.demo.model.ApiResult;
import com.gacfox.demo.model.StudentDto;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
import javax.annotation.Resource;
@Slf4j
@RestController
@RequestMapping("/demo")
public class DemoController {
@Resource
private WebClient webClient;
@GetMapping("/hello")
public Mono<ApiResult<?>> hello() {
return webClient
.get()
.uri("/student/getStudentById?id=1")
.retrieve()
.bodyToMono(new ParameterizedTypeReference<ApiResult<StudentDto>>() {
}).map(result -> {
if ("0".equals(result.getCode())) {
return ApiResult.success("查询成功,学生: " + result.getData().getName());
} else {
return ApiResult.failure("查询失败");
}
});
}
}
代码中我们使用了一系列的方法来配置WebClient如何发起HTTP请求,get()
方法设置请求类型为GET,uri()
指定了查询的路径,这个路径会基于前面配置类中指定的基础路径来拼接;retrieve()
方法用于定义如何解析响应,其中bodyToMono()
指定将响应转为Mono,它的参数是响应体反序列化的类型(默认基于JSON反序列化),这里可以传入Class类型的参数,但由于我们的类型带有泛型,因此使用ParameterizedTypeReference
类型。最终,我们基于Mono继续响应式流的配置,我们根据响应的code
属性设置控制器方法的返回值。
不过实际开发中情况可能更加复杂,由于网络或上游服务可能存在不稳定的情况,我们最好额外处理服务端返回错误HTTP状态码的情形。WebClient中,我们可以直接使用HttpStatus::is4xxClientError
和HttpStatus::is5xxServerError
判断这些情况。
return webClient
.get()
.uri("/student/getStudentById?id=1")
.retrieve()
.onStatus(HttpStatus::is4xxClientError, response -> Mono.error(new RuntimeException("服务请求失败,资源不存在或没有权限")))
.onStatus(HttpStatus::is5xxServerError, response -> Mono.error(new RuntimeException("服务请求失败,服务端异常")))
.bodyToMono(new ParameterizedTypeReference<ApiResult<StudentDto>>() {
}).map(result -> {
if ("0".equals(result.getCode())) {
return ApiResult.success("查询成功,学生: " + result.getData().getName());
} else {
return ApiResult.failure("查询失败");
}
});
上面代码中,我们直接使用Mono.error()
抛出错误,将其交给统一异常处理器处理。
通过这个例子我们可以看到,WebClient也是一个完全Reactive的组件,它与SpringWebFlux配合使用非常适合实现完全异步非阻塞的微服务程序。
前面代码中,我们对于客户端请求的路径是使用字符串自己拼接的,WebClient构建请求时,uri()
方法还支持基于UriBuilder
的重载,使用它构建请求路径更灵活和可靠,下面是一个例子。
return webClient
.get()
.uri(uriBuilder -> uriBuilder
.path("/student/getStudentById")
.queryParam("id", 1)
.build())
.retrieve()
.bodyToMono(new ParameterizedTypeReference<ApiResult<StudentDto>>() {
}).map(result -> {
if ("0".equals(result.getCode())) {
return ApiResult.success("查询成功,学生: " + result.getData().getName());
} else {
return ApiResult.failure("查询失败");
}
});
对于POST或PUT请求可能携带请求体,这可以在构建请求信息时使用bodyValue()
方法设置,默认情况下,请求体会自动使用Jackson进行JSON序列化。
package com.gacfox.demo.controller;
import com.gacfox.demo.model.ApiResult;
import com.gacfox.demo.model.StudentDto;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
import javax.annotation.Resource;
@Slf4j
@RestController
@RequestMapping("/demo")
public class DemoController {
@Resource
private WebClient webClient;
@GetMapping("/hello")
public Mono<ApiResult<?>> hello() {
StudentDto studentDto = StudentDto.builder()
.name("汤姆")
.age(18)
.build();
return webClient
.post()
.uri("/student/saveStudent")
.bodyValue(studentDto)
.retrieve()
.bodyToMono(new ParameterizedTypeReference<ApiResult<?>>() {
}).map(result -> {
if ("0".equals(result.getCode())) {
return ApiResult.success("操作成功");
} else {
return ApiResult.failure("操作失败");
}
});
}
}
在SpringWebFlux的应用场景中,我们请求的上游很可能返回一个SSE流式响应,WebClient对这类响应也有良好的支持,我们只需要将接收的类型设置为MediaType.TEXT_EVENT_STREAM
,然后配合使用bodyToFlux()
方法就行了。下面例子调用了一个SSE接口,但代码并没有继续流式输出,而是使用了reduce()
操作符将响应规约到单个Mono对象上。
package com.gacfox.demo.controller;
import com.gacfox.demo.model.ApiResult;
import com.gacfox.demo.model.ChatEventResponseData;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
import javax.annotation.Resource;
@Slf4j
@RestController
@RequestMapping("/demo")
public class DemoController {
@Resource
private WebClient webClient;
@GetMapping("/hello")
public Mono<ApiResult<?>> hello() {
return webClient
.get()
.uri("/chat")
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(new ParameterizedTypeReference<ServerSentEvent<ChatEventResponseData>>() {
})
.reduce(new StringBuilder(), (sb, value) -> {
if (value.data() != null) {
sb.append(value.data().getMessage());
}
return sb;
})
.map(sb -> ApiResult.success(sb.toString()));
}
}
代码中,我们使用了bodyToFlux(new ParameterizedTypeReference<ServerSentEvent<ChatEventResponseData>>() { })
,它表示我们上游返回是是数据流,这个数据流可以用ServerSentEvent<ChatEventResponseData>
类型解析,WebClient会自动处理标准的SSE格式。bodyToFlux
会返回Flux对象,我们在其后使用了reduce
操作符,将响应的所有内容规约为一个StringBuilder
,最后将其转换为了通用响应体ApiResult
返回给客户端。
对于SSE连接的超时问题,一般来说,SSE是流式响应,服务器通常会立即返回响应头然后再逐步推送数据,如果responseTimeout
设置为5秒但SSE发送数据总共需要10秒,如果响应头正确在一开始就返回了,这其实并不会触发responseTimeout
超时。然而SSE数据流可能出现发送一半“卡住”的情况,我们仍要处理这种超时问题,避免连接迟迟不能释放。对于这种超时判断,一种方式是直接使用Flux的timeouot
操作符。
.bodyToFlux(new ParameterizedTypeReference<ServerSentEvent<ChatEventResponseData>>() {
})
.timeout(Duration.ofMillis(3000))
timeout
操作符的含义是当指定时间内没有元素被发射就抛出异常,它适合处理SSE发送超时的场景。
WebFilter支持过滤器功能,它可以为请求和响应添加一些通用的切面逻辑。下面例子代码在配置WebClient的阶段使用了filter()
方法配置了过滤器逻辑,它在请求发出前打印了一条日志信息。
@Bean
public WebClient webClient(WebClient.Builder builder) {
HttpClient httpClient = HttpClient.create()
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.responseTimeout(Duration.ofMillis(5000));
return builder
.clientConnector(new ReactorClientHttpConnector(httpClient))
.baseUrl("http://localhost:8080")
.defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.filter((request, next) -> {
log.info("request: {}", request.url());
return next.exchange(request);
})
.build();
}