我们知道Spring实现了事件机制,它能够实现Event事件对象的发布订阅功能,实现低耦合、无侵入式的业务逻辑处理,也可以实现异步处理等高级功能,达到业务逻辑解耦的目标。然而扩展到微服务环境,原本Spring的事件机制在微服务环境下不能跨服务发布消息,此时我们自然就会想到需要借助消息队列实现该功能,不过SpringCloud Bus已经帮我们把这些功能都封装好了。
SpringCloud Bus底层依赖消息队列来实现跨服务的消息传递,目前主要支持RabbitMQ和Kafka两种消息中间件。这里出于简单起见,我们直接使用Docker启动RabbitMQ实例作为开发环境,生产环境则可能需要搭建RabbitMQ集群,有关RabbitMQ的知识可以参考相关章节,这里不过多涉及。
docker run -id --name=rabbitmq -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_USER=root -e RABBITMQ_DEFAULT_PASS=root rabbitmq:management
启动后,我们可以在15672
端口访问RabbitMQ的网页版控制台,我们的程序则需要通过5672
端口用AMQP协议连接RabbitMQ。
在需要收发消息的服务中,我们需要添加spring-cloud-starter-bus-amqp
依赖,它实现了基于RabbitMQ的消息机制。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
此外我们还需要在application.properties
中配置RabbitMQ的连接信息。
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=root
SpringCloud Bus中,对于自定义事件需要继承RemoteApplicationEvent
类,下面是一个例子。
package com.gacfox.demo.product.event;
import lombok.Getter;
import lombok.Setter;
import org.springframework.cloud.bus.event.RemoteApplicationEvent;
@Getter
@Setter
public class DemoRemoteApplicationEvent extends RemoteApplicationEvent {
private String data;
public DemoRemoteApplicationEvent() {
super();
}
public DemoRemoteApplicationEvent(Object source, String originService, String data) {
super(source, originService, () -> "**");
this.data = data;
}
}
代码中我们自定义了一个事件类DemoRemoteApplicationEvent
,该类包含了一个data
属性,它是我们自定义的数据字段,该数据随着事件对象从事件的发出端经过MQ传递到事件的消费端,这一过程中SpringCloud Bus会默认使用JSON格式传递,因此我们还要额外保证我们的事件对象能够正确被Jackson库进行序列化和反序列化。
另外事件对象还有两个重要的属性,originService
和destination
,分别代表事件的发出端服务和事件的目的地址,这里我们的发出端服务由外部获取并传入,事件的目的地址是通配符**
,即所有服务都会收到消息。
创建好事件对象后,我们还需要将其配置到工程中,我们可以在启动类或配置类上添加注解来配置。
package com.gacfox.demo.order;
import com.gacfox.demo.product.event.DemoRemoteApplicationEvent;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.bus.jackson.RemoteApplicationEventScan;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.openfeign.EnableFeignClients;
@SpringBootApplication
@EnableDiscoveryClient
@EnableFeignClients(basePackages = {"com.gacfox.demo.product.api"})
@RemoteApplicationEventScan(basePackageClasses = DemoRemoteApplicationEvent.class)
public class OrderApplication {
public static void main(String[] args) {
SpringApplication.run(OrderApplication.class, args);
}
}
我们使用了@RemoteApplicationEventScan
注解并指定了自定义事件类,这里注意在事件发出和接收的两端都需要进行该配置。
下面代码分别在两个微服务中发送和接收消息,其中product
服务负责发送,order
服务负责接收。
发送消息例子代码如下。
package com.gacfox.demo.product.controller;
import com.gacfox.demo.product.event.DemoRemoteApplicationEvent;
import org.springframework.cloud.bus.BusProperties;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
@RequestMapping("/api/v1/demo")
public class DemoController {
@Resource
private BusProperties busProperties;
@Resource
private ApplicationEventPublisher eventPublisher;
@GetMapping("/publishEvent")
public void publishEvent(String data) {
DemoRemoteApplicationEvent demoRemoteApplicationEvent =
new DemoRemoteApplicationEvent(this, busProperties.getId(), data);
eventPublisher.publishEvent(demoRemoteApplicationEvent);
}
}
接收消息例子代码如下。
package com.gacfox.demo.order.service;
import com.gacfox.demo.product.event.DemoRemoteApplicationEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class DemoEventListener {
@EventListener
public void onDemoEvent(DemoRemoteApplicationEvent event) {
log.info("收到消息: {}", event.getData());
}
}
发送消息时,消息的源端originService
参数需要从BusProperties
中获取,我们直接使用依赖注入获取即可,消息的目的端在消息对象中我们已经定义好了,是通配符**
即所有服务都可以收到消息,最后使用ApplicationEventPublisher
发送消息即可。接收消息时,我们使用@EventListener
注解标注一个方法,消息对象会自动传入,我们在其中实现处理消息的业务逻辑。