消息驱动Bean

EJB中的消息驱动Bean(Message-Driven Bean, MDB)需要结合JMS使用,它用于异步的消息处理。消息驱动Bean可以监听JMS中的队列(Queue)或主题(Topic),并在消息到达时触发业务逻辑。MDB的核心优势在于解耦生产者和消费者,实现高可靠性的异步通信。这篇笔记我们基于Wildfly14应用服务器介绍消息驱动Bean的开发和使用。

实现消息驱动Bean

我们这里直接以例子形式实现一个消息驱动Bean,不过开始之前我们首先需要在JavaEE应用服务器中创建JMS队列,我们这里创建一个名叫OrderQueue的队列,Servlet代码中将向这个队列发送消息,而我们的消息驱动Bean将接受消息并处理。

Wildfly14中我们创建的队列如下图所示。Wildfly14默认集成了ActiveMQ作为JMS实现,不过要记得以standalone-full.xml配置启动Wildfly。

JMS生产者代码如下所示,它就是一个简单的Servlet,访问后便会生成一条消息通过ActiveMQ传递给我们的JMS消费者,也就是消息驱动Bean。

package com.gacfox.netstore.web.servlet;

import com.gacfox.netstore.api.model.OrderDto;

import javax.annotation.Resource;
import javax.jms.*;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;

@WebServlet(name = "OrderServlet", urlPatterns = "/order")
public class OrderServlet extends HttpServlet {
    @Resource(lookup = "java:/ConnectionFactory")
    private ConnectionFactory connectionFactory;
    @Resource(lookup = "java:/jms/queue/OrderQueue")
    private Queue queue;

    @Override
    protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        try (JMSContext context = connectionFactory.createContext()) {
            JMSProducer producer = context.createProducer();
            OrderDto orderDto = new OrderDto("ORD001");
            ObjectMessage objectMessage = context.createObjectMessage(orderDto);
            producer.send(queue, objectMessage);
        }
    }
}

有关JMS的用法可以参考软件工程/Java/JavaEE/JMS相关章节,这里不过多赘述了。

消息驱动Bean的实现如下。

package com.gacfox.netstore.ejb;

import com.gacfox.netstore.api.model.OrderDto;
import org.jboss.logging.Logger;

import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;

@MessageDriven(
        activationConfig = {
                @ActivationConfigProperty(
                        propertyName = "destinationType",
                        propertyValue = "javax.jms.Queue"
                ),
                @ActivationConfigProperty(
                        propertyName = "destination",
                        propertyValue = "java:/jms/queue/OrderQueue"
                )
        }
)
public class OrderMDB implements MessageListener {
    private static final Logger logger = Logger.getLogger(HelloServiceImpl.class);

    @Override
    public void onMessage(Message message) {
        ObjectMessage objectMessage = (ObjectMessage) message;
        try {
            OrderDto orderDto = (OrderDto) objectMessage.getObject();
            logger.infov("收到订单消息: {0}", orderDto.getOrderId());
        } catch (JMSException e) {
            throw new RuntimeException(e);
        }
    }
}

消息驱动Bean需要使用@MessageDriven注解标注,其中activationConfig需要配置JMS对接的类型和队列的JNDI名,如果是队列类型,类型指定为javax.jms.Queue,如果是主题类型,则需要指定javax.jms.Topic

消息驱动Bean的并发数

消息驱动Bean由EJB容器管理,它可以多线程并发的处理消息,我们可以手动指定一个并发数。

@ActivationConfigProperty(
    propertyName = "maxSession",
    propertyValue = "15")

maxSession的默认值是JavaEE应用服务器自己设定的,通常在15到20左右。

消息驱动Bean的事务管理

消息驱动Bean默认会开启事务,默认事务属性是REQUIRED,即如果调用时没有事务,容器会开启一个新事务;如果已有事务,则加入该事务。我们也可以使用@TransactionAttribute注解调整这一行为。

@TransactionAttribute(TransactionAttributeType.REQUIRED)
public class OrderMDB implements MessageListener { }
作者:Gacfox
版权声明:本网站为非盈利性质,文章如非特殊说明均为原创,版权遵循知识共享协议CC BY-NC-ND 4.0进行授权,转载必须署名,禁止用于商业目的或演绎修改后转载。
Copyright © 2017-2024 Gacfox All Rights Reserved.
Build with NextJS | Sitemap