WebClient客户端

SpringWebFlux中提供了WebClient非阻塞HTTP客户端库,它支持异步和流式处理,适用于构建高性能、高并发的应用,是SpringMVC中RestTemplate的替代。这篇笔记我们对SpringWebFlux中WebClient的使用进行简单介绍。

WebClient基本使用

创建WebClient可以使用静态方法create()或使用该类提供的Builder模式,前者返回包含默认配置的WebClient,后者则可以让我们更精细的控制WebClient的各项配置。

// 默认配置
WebClient client = WebClient.create();

// 自定义配置(推荐)
WebClient client = WebClient.builder()
    .baseUrl("http://localhost:8080")
    .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
    .build();

不过在SpringWebFlux中,我们最好不要每次用WebClient时都重新创建一个实例,我们可以将通用的部分封装为Spring Bean,每次使用时通过依赖注入获取。下面例子代码是一个Java Config配置类,它创建了一个默认connectTimeout、responseTimeout为5秒,基础路径为http://localhost:8080的WebClient实例,并使用@Bean注解注册为Spring Bean。

package com.gacfox.demo.config;

import io.netty.channel.ChannelOption;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.netty.http.client.HttpClient;

import java.time.Duration;

@Configuration
public class WebClientConfig {
    @Bean
    public WebClient webClient(WebClient.Builder builder) {
        HttpClient httpClient = HttpClient.create()
                .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
                .responseTimeout(Duration.ofMillis(5000));
        return builder
                .clientConnector(new ReactorClientHttpConnector(httpClient))
                .baseUrl("http://localhost:8080")
                .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
                .build();
    }
}
  • connectTimeout:建立连接的超时时间
  • responseTimeout:等待服务器响应的超时时间,即客户端从发送请求到收到响应头的最大时间

下面代码演示了如何使用WebClient发起GET请求。我们请求的路径是http://localhost:8080/student/getStudentById?id=1,它返回查询到的学生信息,我们的控制器方法根据返回值输出查询成功还是失败的消息。

package com.gacfox.demo.controller;

import com.gacfox.demo.model.ApiResult;
import com.gacfox.demo.model.StudentDto;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;

import javax.annotation.Resource;

@Slf4j
@RestController
@RequestMapping("/demo")
public class DemoController {
    @Resource
    private WebClient webClient;

    @GetMapping("/hello")
    public Mono<ApiResult<?>> hello() {
        return webClient
                .get()
                .uri("/student/getStudentById?id=1")
                .retrieve()
                .bodyToMono(new ParameterizedTypeReference<ApiResult<StudentDto>>() {
                }).map(result -> {
                    if ("0".equals(result.getCode())) {
                        return ApiResult.success("查询成功,学生: " + result.getData().getName());
                    } else {
                        return ApiResult.failure("查询失败");
                    }
                });
    }
}

代码中我们使用了一系列的方法来配置WebClient如何发起HTTP请求,get()方法设置请求类型为GET,uri()指定了查询的路径,这个路径会基于前面配置类中指定的基础路径来拼接;retrieve()方法用于定义如何解析响应,其中bodyToMono()指定将响应转为Mono,它的参数是响应体反序列化的类型(默认基于JSON反序列化),这里可以传入Class类型的参数,但由于我们的类型带有泛型,因此使用ParameterizedTypeReference类型。最终,我们基于Mono继续响应式流的配置,我们根据响应的code属性设置控制器方法的返回值。

不过实际开发中情况可能更加复杂,由于网络或上游服务可能存在不稳定的情况,我们最好额外处理服务端返回错误HTTP状态码的情形。WebClient中,我们可以直接使用HttpStatus::is4xxClientErrorHttpStatus::is5xxServerError判断这些情况。

return webClient
        .get()
        .uri("/student/getStudentById?id=1")
        .retrieve()
        .onStatus(HttpStatus::is4xxClientError, response -> Mono.error(new RuntimeException("服务请求失败,资源不存在或没有权限")))
        .onStatus(HttpStatus::is5xxServerError, response -> Mono.error(new RuntimeException("服务请求失败,服务端异常")))
        .bodyToMono(new ParameterizedTypeReference<ApiResult<StudentDto>>() {
        }).map(result -> {
            if ("0".equals(result.getCode())) {
                return ApiResult.success("查询成功,学生: " + result.getData().getName());
            } else {
                return ApiResult.failure("查询失败");
            }
        });

上面代码中,我们直接使用Mono.error()抛出错误,将其交给统一异常处理器处理。

通过这个例子我们可以看到,WebClient也是一个完全Reactive的组件,它与SpringWebFlux配合使用非常适合实现完全异步非阻塞的微服务程序。

构建请求URI

前面代码中,我们对于客户端请求的路径是使用字符串自己拼接的,WebClient构建请求时,uri()方法还支持基于UriBuilder的重载,使用它构建请求路径更灵活和可靠,下面是一个例子。

return webClient
        .get()
        .uri(uriBuilder -> uriBuilder
                .path("/student/getStudentById")
                .queryParam("id", 1)
                .build())
        .retrieve()
        .bodyToMono(new ParameterizedTypeReference<ApiResult<StudentDto>>() {
        }).map(result -> {
            if ("0".equals(result.getCode())) {
                return ApiResult.success("查询成功,学生: " + result.getData().getName());
            } else {
                return ApiResult.failure("查询失败");
            }
        });

传入请求体

对于POST或PUT请求可能携带请求体,这可以在构建请求信息时使用bodyValue()方法设置,默认情况下,请求体会自动使用Jackson进行JSON序列化。

package com.gacfox.demo.controller;

import com.gacfox.demo.model.ApiResult;
import com.gacfox.demo.model.StudentDto;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;

import javax.annotation.Resource;

@Slf4j
@RestController
@RequestMapping("/demo")
public class DemoController {
    @Resource
    private WebClient webClient;

    @GetMapping("/hello")
    public Mono<ApiResult<?>> hello() {
        StudentDto studentDto = StudentDto.builder()
                .name("汤姆")
                .age(18)
                .build();

        return webClient
                .post()
                .uri("/student/saveStudent")
                .bodyValue(studentDto)
                .retrieve()
                .bodyToMono(new ParameterizedTypeReference<ApiResult<?>>() {
                }).map(result -> {
                    if ("0".equals(result.getCode())) {
                        return ApiResult.success("操作成功");
                    } else {
                        return ApiResult.failure("操作失败");
                    }
                });
    }
}

处理SSE流式返回信息

在SpringWebFlux的应用场景中,我们请求的上游很可能返回一个SSE流式响应,WebClient对这类响应也有良好的支持,我们只需要将接收的类型设置为MediaType.TEXT_EVENT_STREAM,然后配合使用bodyToFlux()方法就行了。下面例子调用了一个SSE接口,但代码并没有继续流式输出,而是使用了reduce()操作符将响应规约到单个Mono对象上。

package com.gacfox.demo.controller;

import com.gacfox.demo.model.ApiResult;
import com.gacfox.demo.model.ChatEventResponseData;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;

import javax.annotation.Resource;

@Slf4j
@RestController
@RequestMapping("/demo")
public class DemoController {
    @Resource
    private WebClient webClient;

    @GetMapping("/hello")
    public Mono<ApiResult<?>> hello() {
        return webClient
                .get()
                .uri("/chat")
                .accept(MediaType.TEXT_EVENT_STREAM)
                .retrieve()
                .bodyToFlux(new ParameterizedTypeReference<ServerSentEvent<ChatEventResponseData>>() {
                })
                .reduce(new StringBuilder(), (sb, value) -> {
                    if (value.data() != null) {
                        sb.append(value.data().getMessage());
                    }
                    return sb;
                })
                .map(sb -> ApiResult.success(sb.toString()));
    }
}

代码中,我们使用了bodyToFlux(new ParameterizedTypeReference<ServerSentEvent<ChatEventResponseData>>() { }),它表示我们上游返回是是数据流,这个数据流可以用ServerSentEvent<ChatEventResponseData>类型解析,WebClient会自动处理标准的SSE格式。bodyToFlux会返回Flux对象,我们在其后使用了reduce操作符,将响应的所有内容规约为一个StringBuilder,最后将其转换为了通用响应体ApiResult返回给客户端。

对于SSE连接的超时问题,一般来说,SSE是流式响应,服务器通常会立即返回响应头然后再逐步推送数据,如果responseTimeout设置为5秒但SSE发送数据总共需要10秒,如果响应头正确在一开始就返回了,这其实并不会触发responseTimeout超时。然而SSE数据流可能出现发送一半“卡住”的情况,我们仍要处理这种超时问题,避免连接迟迟不能释放。对于这种超时判断,一种方式是直接使用Flux的timeouot操作符。

.bodyToFlux(new ParameterizedTypeReference<ServerSentEvent<ChatEventResponseData>>() {
})
.timeout(Duration.ofMillis(3000))

timeout操作符的含义是当指定时间内没有元素被发射就抛出异常,它适合处理SSE发送超时的场景。

使用过滤器

WebFilter支持过滤器功能,它可以为请求和响应添加一些通用的切面逻辑。下面例子代码在配置WebClient的阶段使用了filter()方法配置了过滤器逻辑,它在请求发出前打印了一条日志信息。

@Bean
public WebClient webClient(WebClient.Builder builder) {
    HttpClient httpClient = HttpClient.create()
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
            .responseTimeout(Duration.ofMillis(5000));
    return builder
            .clientConnector(new ReactorClientHttpConnector(httpClient))
            .baseUrl("http://localhost:8080")
            .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
            .filter((request, next) -> {
                log.info("request: {}", request.url());
                return next.exchange(request);
            })
            .build();
}
作者:Gacfox
版权声明:本网站为非盈利性质,文章如非特殊说明均为原创,版权遵循知识共享协议CC BY-NC-ND 4.0进行授权,转载必须署名,禁止用于商业目的或演绎修改后转载。
Copyright © 2017-2024 Gacfox All Rights Reserved.
Build with NextJS | Sitemap