EJB中的消息驱动Bean(Message-Driven Bean, MDB)需要结合JMS使用,它用于异步的消息处理。消息驱动Bean可以监听JMS中的队列(Queue)或主题(Topic),并在消息到达时触发业务逻辑。MDB的核心优势在于解耦生产者和消费者,实现高可靠性的异步通信。这篇笔记我们基于Wildfly14应用服务器介绍消息驱动Bean的开发和使用。
我们这里直接以例子形式实现一个消息驱动Bean,不过开始之前我们首先需要在JavaEE应用服务器中创建JMS队列,我们这里创建一个名叫OrderQueue
的队列,Servlet代码中将向这个队列发送消息,而我们的消息驱动Bean将接受消息并处理。
Wildfly14中我们创建的队列如下图所示。Wildfly14默认集成了ActiveMQ作为JMS实现,不过要记得以standalone-full.xml
配置启动Wildfly。
JMS生产者代码如下所示,它就是一个简单的Servlet,访问后便会生成一条消息通过ActiveMQ传递给我们的JMS消费者,也就是消息驱动Bean。
package com.gacfox.netstore.web.servlet;
import com.gacfox.netstore.api.model.OrderDto;
import javax.annotation.Resource;
import javax.jms.*;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
@WebServlet(name = "OrderServlet", urlPatterns = "/order")
public class OrderServlet extends HttpServlet {
@Resource(lookup = "java:/ConnectionFactory")
private ConnectionFactory connectionFactory;
@Resource(lookup = "java:/jms/queue/OrderQueue")
private Queue queue;
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
try (JMSContext context = connectionFactory.createContext()) {
JMSProducer producer = context.createProducer();
OrderDto orderDto = new OrderDto("ORD001");
ObjectMessage objectMessage = context.createObjectMessage(orderDto);
producer.send(queue, objectMessage);
}
}
}
有关JMS的用法可以参考软件工程/Java/JavaEE/JMS
相关章节,这里不过多赘述了。
消息驱动Bean的实现如下。
package com.gacfox.netstore.ejb;
import com.gacfox.netstore.api.model.OrderDto;
import org.jboss.logging.Logger;
import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
@MessageDriven(
activationConfig = {
@ActivationConfigProperty(
propertyName = "destinationType",
propertyValue = "javax.jms.Queue"
),
@ActivationConfigProperty(
propertyName = "destination",
propertyValue = "java:/jms/queue/OrderQueue"
)
}
)
public class OrderMDB implements MessageListener {
private static final Logger logger = Logger.getLogger(HelloServiceImpl.class);
@Override
public void onMessage(Message message) {
ObjectMessage objectMessage = (ObjectMessage) message;
try {
OrderDto orderDto = (OrderDto) objectMessage.getObject();
logger.infov("收到订单消息: {0}", orderDto.getOrderId());
} catch (JMSException e) {
throw new RuntimeException(e);
}
}
}
消息驱动Bean需要使用@MessageDriven
注解标注,其中activationConfig
需要配置JMS对接的类型和队列的JNDI名,如果是队列类型,类型指定为javax.jms.Queue
,如果是主题类型,则需要指定javax.jms.Topic
。
消息驱动Bean由EJB容器管理,它可以多线程并发的处理消息,我们可以手动指定一个并发数。
@ActivationConfigProperty(
propertyName = "maxSession",
propertyValue = "15")
maxSession
的默认值是JavaEE应用服务器自己设定的,通常在15到20左右。
消息驱动Bean默认会开启事务,默认事务属性是REQUIRED
,即如果调用时没有事务,容器会开启一个新事务;如果已有事务,则加入该事务。我们也可以使用@TransactionAttribute
注解调整这一行为。
@TransactionAttribute(TransactionAttributeType.REQUIRED)
public class OrderMDB implements MessageListener { }