JMS中传递消息有两种模型,点对点模式(PTP,Point to Point)和发布订阅模式(Pub-Sub)。点对点模式的基于队列(Queue)的,消息生产者(Producer)将消息发送到队列,消息消费者(Consumer)从队列中接收消息;发布订阅模式则是一种广播消息模式,它是基于主题(Topic)的,消息发布者(Publisher)将消息发布到主题,消息订阅者(Subscriber)可以选择订阅感兴趣的主题以接收相关消息。
不过实际上,无论点对点模式还是发布订阅模式,它们的使用方法都是类似的,这篇笔记我们介绍两种模式的使用。
前一篇笔记中我们介绍过,JMS中有一个核心概念是目标(Destination),实际上,队列(Queue)和主题(Topic)接口继承了目标接口,因此它们的使用方式十分类似。
在Wildfly中,我们可以分别在jms-queue
和jms-topic
节点对消息队列和主题进行配置。
在点对点消息模式中,消息生产者生产消息发送到消息队列(Queue)中,消息消费者从消息队列取出并消费消息。消息一旦被消费以后,消息队列中就不再有这一条可消费的消息了,因此消息消费者不可能消费到已经被自己或其它消费者消费的消息。消息队列支持存在多个消费者,但是对一个消息而言,只会有一个消费者将其消费。
下面例子基于JMS点对点模式编写了消息生产和消费的示例代码。
package com.gacfox.demo.demoweb;
import javax.annotation.Resource;
import javax.jms.*;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
@WebServlet(name = "ProducerServlet", urlPatterns = "/producer")
public class ProducerServlet extends HttpServlet {
@Resource(lookup = "java:/ConnectionFactory")
private ConnectionFactory connectionFactory;
@Resource(lookup = "java:/jms/queue/DemoQueue")
private Queue queue;
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
try (JMSContext context = connectionFactory.createContext()) {
JMSProducer producer = context.createProducer();
TextMessage textMessage = context.createTextMessage("Hello, JMS!");
producer.send(queue, textMessage);
}
}
}
package com.gacfox.demo.demoweb.listerner;
import javax.annotation.Resource;
import javax.jms.*;
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import javax.servlet.annotation.WebListener;
@WebListener
public class DemoListener implements ServletContextListener {
@Resource(lookup = "java:/ConnectionFactory")
private ConnectionFactory connectionFactory;
@Resource(lookup = "java:/jms/queue/DemoQueue")
private Queue queue;
private JMSContext context;
@Override
public void contextInitialized(ServletContextEvent sce) {
context = connectionFactory.createContext();
JMSConsumer consumer = context.createConsumer(queue);
consumer.setMessageListener(message -> {
try {
TextMessage textMessage = (TextMessage) message;
System.out.println(textMessage.getText());
} catch (JMSException e) {
throw new RuntimeException(e);
}
});
}
@Override
public void contextDestroyed(ServletContextEvent sce) {
if (context != null) {
context.close();
}
}
}
发布订阅模式中,消息生产者将消息发布到主题(Topic)中,但同时有多个消息消费者消费该消息,这和点对点消息模式不同,发布到主题的消息会被所有消费者消费。在发布订阅模式中,我们也经常将消息生产者称为消息发布者,将消息消费者称为消息订阅者。
下面例子基于JMS发布订阅模式编写了消息生产和消费的示例代码。
package com.gacfox.demo.demoweb;
import javax.annotation.Resource;
import javax.jms.*;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
@WebServlet(name = "ProducerServlet", urlPatterns = "/producer")
public class ProducerServlet extends HttpServlet {
@Resource(lookup = "java:/ConnectionFactory")
private ConnectionFactory connectionFactory;
@Resource(lookup = "java:/jms/topic/DemoTopic")
private Topic topic;
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
try (JMSContext context = connectionFactory.createContext()) {
JMSProducer producer = context.createProducer();
TextMessage textMessage = context.createTextMessage("Hello, JMS!");
producer.send(topic, textMessage);
}
}
}
package com.gacfox.demo.demoweb.listerner;
import javax.annotation.Resource;
import javax.jms.*;
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import javax.servlet.annotation.WebListener;
@WebListener
public class DemoListener implements ServletContextListener {
@Resource(lookup = "java:/ConnectionFactory")
private ConnectionFactory connectionFactory;
@Resource(lookup = "java:/jms/topic/DemoTopic")
private Topic topic;
private JMSContext context;
@Override
public void contextInitialized(ServletContextEvent sce) {
context = connectionFactory.createContext();
JMSConsumer consumer = context.createConsumer(topic);
consumer.setMessageListener(message -> {
try {
TextMessage textMessage = (TextMessage) message;
System.out.println(textMessage.getText());
} catch (JMSException e) {
throw new RuntimeException(e);
}
});
}
@Override
public void contextDestroyed(ServletContextEvent sce) {
if (context != null) {
context.close();
}
}
}