点对点和发布订阅

JMS中传递消息有两种模型,点对点模式(PTP,Point to Point)和发布订阅模式(Pub-Sub)。点对点模式的基于队列(Queue)的,消息生产者(Producer)将消息发送到队列,消息消费者(Consumer)从队列中接收消息;发布订阅模式则是一种广播消息模式,它是基于主题(Topic)的,消息发布者(Publisher)将消息发布到主题,消息订阅者(Subscriber)可以选择订阅感兴趣的主题以接收相关消息。

不过实际上,无论点对点模式还是发布订阅模式,它们的使用方法都是类似的,这篇笔记我们介绍两种模式的使用。

队列、主题和目标

前一篇笔记中我们介绍过,JMS中有一个核心概念是目标(Destination),实际上,队列(Queue)和主题(Topic)接口继承了目标接口,因此它们的使用方式十分类似。

在Wildfly中,我们可以分别在jms-queuejms-topic节点对消息队列和主题进行配置。

点对点模式

在点对点消息模式中,消息生产者生产消息发送到消息队列(Queue)中,消息消费者从消息队列取出并消费消息。消息一旦被消费以后,消息队列中就不再有这一条可消费的消息了,因此消息消费者不可能消费到已经被自己或其它消费者消费的消息。消息队列支持存在多个消费者,但是对一个消息而言,只会有一个消费者将其消费。

下面例子基于JMS点对点模式编写了消息生产和消费的示例代码。

package com.gacfox.demo.demoweb;

import javax.annotation.Resource;
import javax.jms.*;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;

@WebServlet(name = "ProducerServlet", urlPatterns = "/producer")
public class ProducerServlet extends HttpServlet {
    @Resource(lookup = "java:/ConnectionFactory")
    private ConnectionFactory connectionFactory;
    @Resource(lookup = "java:/jms/queue/DemoQueue")
    private Queue queue;

    @Override
    protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        try (JMSContext context = connectionFactory.createContext()) {
            JMSProducer producer = context.createProducer();
            TextMessage textMessage = context.createTextMessage("Hello, JMS!");
            producer.send(queue, textMessage);
        }
    }
}
package com.gacfox.demo.demoweb.listerner;

import javax.annotation.Resource;
import javax.jms.*;
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import javax.servlet.annotation.WebListener;

@WebListener
public class DemoListener implements ServletContextListener {
    @Resource(lookup = "java:/ConnectionFactory")
    private ConnectionFactory connectionFactory;
    @Resource(lookup = "java:/jms/queue/DemoQueue")
    private Queue queue;
    private JMSContext context;

    @Override
    public void contextInitialized(ServletContextEvent sce) {
        context = connectionFactory.createContext();
        JMSConsumer consumer = context.createConsumer(queue);
        consumer.setMessageListener(message -> {
            try {
                TextMessage textMessage = (TextMessage) message;
                System.out.println(textMessage.getText());
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        });
    }

    @Override
    public void contextDestroyed(ServletContextEvent sce) {
        if (context != null) {
            context.close();
        }
    }
}

发布订阅模式

发布订阅模式中,消息生产者将消息发布到主题(Topic)中,但同时有多个消息消费者消费该消息,这和点对点消息模式不同,发布到主题的消息会被所有消费者消费。在发布订阅模式中,我们也经常将消息生产者称为消息发布者,将消息消费者称为消息订阅者。

下面例子基于JMS发布订阅模式编写了消息生产和消费的示例代码。

package com.gacfox.demo.demoweb;

import javax.annotation.Resource;
import javax.jms.*;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;

@WebServlet(name = "ProducerServlet", urlPatterns = "/producer")
public class ProducerServlet extends HttpServlet {
    @Resource(lookup = "java:/ConnectionFactory")
    private ConnectionFactory connectionFactory;
    @Resource(lookup = "java:/jms/topic/DemoTopic")
    private Topic topic;

    @Override
    protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        try (JMSContext context = connectionFactory.createContext()) {
            JMSProducer producer = context.createProducer();
            TextMessage textMessage = context.createTextMessage("Hello, JMS!");
            producer.send(topic, textMessage);
        }
    }
}
package com.gacfox.demo.demoweb.listerner;

import javax.annotation.Resource;
import javax.jms.*;
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import javax.servlet.annotation.WebListener;

@WebListener
public class DemoListener implements ServletContextListener {
    @Resource(lookup = "java:/ConnectionFactory")
    private ConnectionFactory connectionFactory;
    @Resource(lookup = "java:/jms/topic/DemoTopic")
    private Topic topic;
    private JMSContext context;

    @Override
    public void contextInitialized(ServletContextEvent sce) {
        context = connectionFactory.createContext();
        JMSConsumer consumer = context.createConsumer(topic);
        consumer.setMessageListener(message -> {
            try {
                TextMessage textMessage = (TextMessage) message;
                System.out.println(textMessage.getText());
            } catch (JMSException e) {
                throw new RuntimeException(e);
            }
        });
    }

    @Override
    public void contextDestroyed(ServletContextEvent sce) {
        if (context != null) {
            context.close();
        }
    }
}
作者:Gacfox
版权声明:本网站为非盈利性质,文章如非特殊说明均为原创,版权遵循知识共享协议CC BY-NC-ND 4.0进行授权,转载必须署名,禁止用于商业目的或演绎修改后转载。
Copyright © 2017-2024 Gacfox All Rights Reserved.
Build with NextJS | Sitemap