Bus消息总线

我们知道Spring实现了事件机制,它能够实现Event事件对象的发布订阅功能,实现低耦合、无侵入式的业务逻辑处理,也可以实现异步处理等高级功能,达到业务逻辑解耦的目标。然而扩展到微服务环境,原本Spring的事件机制在微服务环境下不能跨服务发布消息,此时我们自然就会想到需要借助消息队列实现该功能,不过SpringCloud Bus已经帮我们把这些功能都封装好了。

启动消息队列

SpringCloud Bus底层依赖消息队列来实现跨服务的消息传递,目前主要支持RabbitMQ和Kafka两种消息中间件。这里出于简单起见,我们直接使用Docker启动RabbitMQ实例作为开发环境,生产环境则可能需要搭建RabbitMQ集群,有关RabbitMQ的知识可以参考相关章节,这里不过多涉及。

docker run -id --name=rabbitmq -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_USER=root -e RABBITMQ_DEFAULT_PASS=root rabbitmq:management

启动后,我们可以在15672端口访问RabbitMQ的网页版控制台,我们的程序则需要通过5672端口用AMQP协议连接RabbitMQ。

工程配置

在需要收发消息的服务中,我们需要添加spring-cloud-starter-bus-amqp依赖,它实现了基于RabbitMQ的消息机制。

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>

此外我们还需要在application.properties中配置RabbitMQ的连接信息。

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=root

配置自定义事件对象

SpringCloud Bus中,对于自定义事件需要继承RemoteApplicationEvent类,下面是一个例子。

package com.gacfox.demo.product.event;

import lombok.Getter;
import lombok.Setter;
import org.springframework.cloud.bus.event.RemoteApplicationEvent;

@Getter
@Setter
public class DemoRemoteApplicationEvent extends RemoteApplicationEvent {
    private String data;

    public DemoRemoteApplicationEvent() {
        super();
    }

    public DemoRemoteApplicationEvent(Object source, String originService, String data) {
        super(source, originService, () -> "**");
        this.data = data;
    }
}

代码中我们自定义了一个事件类DemoRemoteApplicationEvent,该类包含了一个data属性,它是我们自定义的数据字段,该数据随着事件对象从事件的发出端经过MQ传递到事件的消费端,这一过程中SpringCloud Bus会默认使用JSON格式传递,因此我们还要额外保证我们的事件对象能够正确被Jackson库进行序列化和反序列化。

另外事件对象还有两个重要的属性,originServicedestination,分别代表事件的发出端服务和事件的目的地址,这里我们的发出端服务由外部获取并传入,事件的目的地址是通配符**,即所有服务都会收到消息。

创建好事件对象后,我们还需要将其配置到工程中,我们可以在启动类或配置类上添加注解来配置。

package com.gacfox.demo.order;

import com.gacfox.demo.product.event.DemoRemoteApplicationEvent;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.bus.jackson.RemoteApplicationEventScan;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.openfeign.EnableFeignClients;

@SpringBootApplication
@EnableDiscoveryClient
@EnableFeignClients(basePackages = {"com.gacfox.demo.product.api"})
@RemoteApplicationEventScan(basePackageClasses = DemoRemoteApplicationEvent.class)
public class OrderApplication {
    public static void main(String[] args) {
        SpringApplication.run(OrderApplication.class, args);
    }
}

我们使用了@RemoteApplicationEventScan注解并指定了自定义事件类,这里注意在事件发出和接收的两端都需要进行该配置。

发送和接收消息

下面代码分别在两个微服务中发送和接收消息,其中product服务负责发送,order服务负责接收。

发送消息例子代码如下。

package com.gacfox.demo.product.controller;

import com.gacfox.demo.product.event.DemoRemoteApplicationEvent;
import org.springframework.cloud.bus.BusProperties;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

@RestController
@RequestMapping("/api/v1/demo")
public class DemoController {
    @Resource
    private BusProperties busProperties;
    @Resource
    private ApplicationEventPublisher eventPublisher;


    @GetMapping("/publishEvent")
    public void publishEvent(String data) {
        DemoRemoteApplicationEvent demoRemoteApplicationEvent =
                new DemoRemoteApplicationEvent(this, busProperties.getId(), data);
        eventPublisher.publishEvent(demoRemoteApplicationEvent);
    }
}

接收消息例子代码如下。

package com.gacfox.demo.order.service;

import com.gacfox.demo.product.event.DemoRemoteApplicationEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class DemoEventListener {
    @EventListener
    public void onDemoEvent(DemoRemoteApplicationEvent event) {
        log.info("收到消息: {}", event.getData());
    }
}

发送消息时,消息的源端originService参数需要从BusProperties中获取,我们直接使用依赖注入获取即可,消息的目的端在消息对象中我们已经定义好了,是通配符**即所有服务都可以收到消息,最后使用ApplicationEventPublisher发送消息即可。接收消息时,我们使用@EventListener注解标注一个方法,消息对象会自动传入,我们在其中实现处理消息的业务逻辑。

作者:Gacfox
版权声明:本网站为非盈利性质,文章如非特殊说明均为原创,版权遵循知识共享协议CC BY-NC-ND 4.0进行授权,转载必须署名,禁止用于商业目的或演绎修改后转载。
Copyright © 2017-2024 Gacfox All Rights Reserved.
Build with NextJS | Sitemap