RabbitMQ的编程接口

RabbitMQ实现了大多数主流语言的客户端,这里以Java为例进行介绍。

添加Maven依赖

使用RabbitMQ,需要amqp-client这个依赖,除此之外,还需要日志模块logback-classic

<dependencies>
  <dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.5.3</version>
  </dependency>
  <dependency>
    <groupId>ch.qos.logback</groupId>
    <artifactId>logback-classic</artifactId>
    <version>1.2.3</version>
  </dependency>
</dependencies>

生产消息

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RabbitProducer
{
    private static final String EXCHANGE_NAME = "exchange1";
    private static final String ROUTING_KEY = "route_key_1";
    private static final String QUEUE_NAME = "queue1";

    private static final String HOST = "192.168.0.152";
    private static final int PORT = 5672;

    public static void main(String[] args)
    {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(HOST);
        connectionFactory.setPort(PORT);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("123456");

        try
        {
            Connection connection = connectionFactory.newConnection();
            // 声明一个信道
            Channel channel = connection.createChannel();
            // 声明一个Exchange
            channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);
            // 声明一个队列
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            // 通过路由键将Exchange和队列绑定
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);

            // 通过信道发送一条纯文本消息
            String msg = "Hello, world!";
            channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.TEXT_PLAIN, msg.getBytes());

            channel.close();
            connection.close();
        }
        catch (IOException | TimeoutException e)
        {
            e.printStackTrace();
        }
    }
}

注意MessageProperties.TEXT_PLAIN这个参数,我们点进去源码发现它是一个AMQP.BasicProperties类型,这个工具用于表示一个消息配置。MessageProperties.TEXT_PLAIN实际上是一个预定义的消息配置,我们也可以手动创建这样的配置,并调用basicPublish()的另一个重载传入:

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.contentType("text/plain");
AMQP.BasicProperties properties = builder.build();
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, properties, msg.getBytes());

配置类中还有其它选项可用,比如超时时间,消息优先级等。这里就不多做介绍了。

消费消息

推模式消费消息

下面这种接收消息的方式称为推模式,消费者端一直监听着消息队列,如果有消息就立即消费,这也是最常见的消费模式。

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;

public class RabbitConsumer
{
    private static final String QUEUE_NAME = "queue1";

    private static final String HOST = "192.168.0.152";
    private static final int PORT = 5672;

    public static void main(String[] args)
    {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(HOST);
        connectionFactory.setPort(PORT);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("123456");
        try
        {
            Connection connection = connectionFactory.newConnection();
            Channel channel = connection.createChannel();
            // 设置当前最大同时处理的消息数,以免大量消息造成过大的线程开销
            channel.basicQos(1);

            // 创建一个消费者对象,定义处理函数
            Consumer consumer = new DefaultConsumer(channel)
            {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
                {
                    // 打印字符串消息
                    System.out.println(new String(body));
                    // 确认消息
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            };
            channel.basicConsume(QUEUE_NAME, consumer);

            // 收到新消息后,Consumer中的处理函数会以新线程方式执行,这里是为了防止主线程退出的代码
            System.out.println("输入任意字符退出程序");
            Scanner scanner = new Scanner(System.in);
            scanner.next();

            channel.close();
            connection.close();

        }
        catch (IOException | TimeoutException e)
        {
            e.printStackTrace();
        }
    }
}

上面消费者代码中,我们主要关注消息接收部分,我们新创建了一个Consumer实例,其中的handleDelivery()会根据我们指定的线程数进行多线程的消息处理。上面代码中处理过程比较简单,只是打印了一下消息,然后调用basicAck()进行确认。如果消息未确认,RabbitMQ会一直为我们保留着消息,不用担心消费者程序卡死而造成消息丢失。

拉模式消费消息

拉模式中,消费者不会一直监听消息队列,而是在需要时,去消息队列主动拉取一条消息。基于之前的代码,这里展示如何实现拉模式。

GetResponse response = channel.basicGet(QUEUE_NAME, false);
if(response != null)
{
  System.out.println(new String(response.getBody()));
  channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
}
else
{
  System.out.println("现在还没有消息");
}

注:channel.basicGet()并不会把线程阻塞掉,如果现在没有消息,会立即返回null

拒绝一条消息

消费端如果发现一条消息处理不了,这时可以明确的拒绝一条消息,拒绝消息时,我们可以指定这条消息删除还是重新放入消息队列。

例子代码:

channel.basicNack(envelope.getDeliveryTag(), false, false);

其中第三个布尔类型参数requeue用于指定是否将消息重新放回队列,这里要注意,重新放回队列后,如果没有能够正确处理这条消息的机制,该消费者还是可能收到该消息,这就形成了死循环,会耗尽消息队列的性能,使用nack时这一点要尤其注意。

作者:Gacfox
版权声明:本网站为非盈利性质,文章如非特殊说明均为原创,版权遵循知识共享协议CC BY-NC-ND 4.0进行授权,转载必须署名,禁止用于商业目的或演绎修改后转载。