JMS简介
JMS(Java Message Service)是JavaEE中针对消息中间件制定的一组API规范,它规定了在应用程序之间发送消息的接口。JMS为分布式应用程序提供了一种异步通信的方式,允许应用程序通过消息传递进行通信,而不需要直接连接,这种消息传递的方式有助于提高系统处理信息的吞吐量并降低系统之间的耦合度。具体来说,JMS常用于以下应用场景。
- 异步处理:将同步的业务流程操作(尤其是因为IO阻塞而比较耗时间的操作)通过消息机制改为异步方式,以提高CPU的利用率
- 应用解耦:将一个大项目拆分后,各个子模块之间通过消息进行通讯
- 流量削锋:如短时间内出现大量订单的情况,流量峰值可以通过消息队列实现排队,而不至于服务器崩溃
- 日志处理:日志也可能有峰值等问题存在,通过消息队列接受日志能够解决这个问题
- 消息通讯:可以用消息队列实现类似聊天室的功能
早期JMS广泛使用的版本是JMS1.1,但该版本规范的API设计比较繁琐,从JavaEE7版本开始JavaEE引入了JMS2.0标准,相比于早期版本,新版本规范有诸多改进,这篇笔记我们主要学习有关JMS2.0的使用,具体JMS实现选择Wildfly14内置的ActiveMQ Artemis,当然我们使用其它的应用服务器或是单独的JMS2.0兼容的实现都是可以的。
JMS基本概念
在具体学习JMS之前,我们需要了解JMS中的基本概念,这些基本概念构成了JMS2.0的核心。
消息(Message):在JMS中,消息是数据的载体,消息可以包含文本、对象等形式的数据。消息由生产者发送到目标,然后由消费者接收并处理。
目标(Destination):目标表示消息发送和接收的目的地。在JMS中主要有两种目标类型:队列(Queue)和主题(Topic)。队列用于点对点(PTP)通信,而主题用于发布/订阅(Pub-Sub)通信模式。
连接工厂(ConnectionFactory):连接工厂用于创建JMS连接,应用程序通过连接工厂获取连接,然后使用连接创建会话。
连接(Connection):连接表示应用程序与消息服务器之间的通信链接,应用程序通过连接与消息服务器建立通信,并且可以创建会话来发送和接收消息。
会话(Session):会话是消息的上下文,用于发送和接收消息。会话提供了创建生产者和消费者的方法,并且定义了消息的确认方式、事务模式等。
生产者(Producer):生产者是用于发送消息到目标的JMS组件。生产者可以将消息发送到队列(Queue)或主题(Topic),并且可以设置消息的属性和其他元数据。
消费者(Consumer):消费者是用于接收消息的JMS组件。消费者从队列(Queue)或主题(Topic)接收消息并处理它们。
监听器(Listener):监听器是一种用于异步接收消息的机制。通过在消费者上注册监听器,应用程序可以在消息到达时自动触发相应的处理逻辑。
消息选择器(Message Selector):消息选择器允许消费者只接收满足特定条件的消息。消费者可以使用消息选择器来过滤消息,只接收符合条件的消息。
在早期的JMS版本中,以上每一个组件都需要我们创建并正确配置,不过JMS2.0做了一些简化,它使用了一个上下文(Context)对象来维护和管理连接、会话,代码更加简洁。
JMS的消息确认模式
JMS中,消息确认模式会影响到消息传递的可靠性和消息系统的性能,我们在开发中需要充分考虑在对应的场景下选择合适的消息确认模式。JMS有以下4种消息确认模式:
AUTO_ACKNOWLEDGE:自动确认,默认值,会话会在消息监听器处理消息成功后自动确认消息,如果消息监听器处理消息时发生异常则将重发消息。
CLIENT_ACKNOWLEDGE:客户端确认,客户端需要手动调用message.acknowledge()方法确认消息。
DUPS_OK_ACKNOWLEDGE:批量自动确认,它用于需要高性能但容忍重复消息的场景,这种模式下消息确认有是批量但可能延迟的,消费节点如果出现故障就存在成功消费却没确认消息的情况,此时就可能出现消息重复。
SESSION_TRANSACTED:事务确认,这种确认模式要求在一个事务内,消息要么全部确认,要么全部回滚重发。如果重发次数达到上限,消息将被存入死信队列。
JMS收发消息例子
我们这里直接写一个简单的例子来演示如何使用JMS收发文本消息。
引入Maven依赖
在pom.xml中,我们可以直接引入JavaEE API的Maven依赖。
<dependency>
<groupId>javax</groupId>
<artifactId>javaee-api</artifactId>
<version>8.0.1</version>
<scope>provided</scope>
</dependency>
当然如果仅使用JMS,我们也可以仅引入JMS的依赖。
配置Wildfly
这里我们使用Wildfly14的内置JMS实现来演示JMS的使用,注意Wildfly默认的配置文件中可能没有启动消息服务,我们这里直接使用standalone-full.xml配置来启动应用服务器,它默认开启了大部分功能。
./standalone.sh -c standalone-full.xml
如下图,在jms-queue配置节点,我们点击Add按钮创建一个新的消息队列,这里我们填写队列名为DemoQueue,JNDI标识为java:/jms/queue/DemoQueue。

JMS消息队列创建完成后,我们可以点击查看详情信息,显示内容如下图。

此时我们就可以在JavaEE工程中使用该消息队列了。
消息生产者和消费者
简单起见,我们这里使用最基础的Servlet工程演示JMS的用法。下面例子中,我们的ProducerServlet用于生产消息,而DemoListener是一个监听器组件,它在应用程序启动的同时开启了JMS消费者并监听消息。
package com.gacfox.demo.demoweb;
import javax.annotation.Resource;
import javax.jms.*;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
@WebServlet(name = "ProducerServlet", urlPatterns = "/producer")
public class ProducerServlet extends HttpServlet {
@Resource(lookup = "java:/ConnectionFactory")
private ConnectionFactory connectionFactory;
@Resource(lookup = "java:/jms/queue/DemoQueue")
private Queue queue;
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
try (JMSContext context = connectionFactory.createContext()) {
JMSProducer producer = context.createProducer();
TextMessage textMessage = context.createTextMessage("Hello, JMS!");
producer.send(queue, textMessage);
}
}
}
上面代码中,我们使用@Resource注解和JNDI向Servlet组件注入了JMS的连接工厂(ConnectionFactory)和目标(Destination,这里目标是一个Queue),当用户请求/producer接口时doGet()方法就会执行,创建JMS消息生产者并向消息队列写入一条消息。
package com.gacfox.demo.demoweb.listerner;
import javax.annotation.Resource;
import javax.jms.*;
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import javax.servlet.annotation.WebListener;
@WebListener
public class DemoListener implements ServletContextListener {
@Resource(lookup = "java:/ConnectionFactory")
private ConnectionFactory connectionFactory;
@Resource(lookup = "java:/jms/queue/DemoQueue")
private Queue queue;
private JMSContext context;
@Override
public void contextInitialized(ServletContextEvent sce) {
context = connectionFactory.createContext();
JMSConsumer consumer = context.createConsumer(queue);
consumer.setMessageListener(message -> {
try {
TextMessage textMessage = (TextMessage) message;
System.out.println(textMessage.getText());
} catch (JMSException e) {
throw new RuntimeException(e);
}
});
}
@Override
public void contextDestroyed(ServletContextEvent sce) {
if (context != null) {
context.close();
}
}
}
DemoListener实现了ServletContextListener接口,contextInitialized()方法会在应用程序启动时执行,具体的代码中我们创建了JMSContext并创建了JMS消费者,JMS上下文会保持开启直到应用程序关闭。对于JMS消费者,我们使用了consumer.setMessageListener()方法为其指定了消息监听器,注意消息监听器中的代码实际上是在子线程中执行的,只要JMSContext没有被关闭,当队列中存在消息时,消费者就会消费这些消息。