RabbitMQ:高级特性

来自Wikioe
跳到导航 跳到搜索


持久化机制

RabbitMQ 持久化机制分为:

  1. 交换器持久化:声明交换机的时候可以通过属性设置是否需要持久化;
    // 声明交换机:第三个参数为持久化选项
    channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT, true);
    
  2. 队列持久化:声明队列的时候可以设置队列是否需要持久化;
    // 声明队列:第二个参数为持久化选项
    channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
    
  3. 消息持久化:在发布消息的时候,通过消息属性可以设置,消息是否需要持久化;
    // 发送消息:第三个参数为消息属性
    //“MessageProperties.PERSISTENT_TEXT_PLAIN”:将消息标记为持久性,确保即使 RabbitMQ 重新启动,队列也不会丢失。
    channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));
    
  • 队列持久化,不代表消息就是持久化的。
  • 不管是持久化的消息还是非持久化的消息都可以被写入到磁盘:
    • 持久化消息会同时写入磁盘和内存(加快读取速度);
    • 非持久化消息会在内存不够用时,将消息写入磁盘(Rabbitmq重启之后就没有了)。

消息顺序

RabbitMQ使用过程中,有些业务场景需要我们保证顺序消费,例如:业务上产生三条消息,分别是对数据的增加、修改、删除操作,如果没有保证顺序消费,执行顺序可能变成删除、修改、增加,这就乱了。

RabbitMQ:消息顺序示例.png


RabbitMQ的消息顺序问题,需要分三个环节看待,发送消息的顺序队列中消息的顺序消费消息的顺序

发送消息的顺序

消息发送端的顺序,大部分业务不做要求,谁先发消息无所谓,如果遇到业务一定要发送消息也确保顺序,那意味着,只能全局加锁一个个的操作,一个个的发消息,不能并发发送消息

队列中消息的顺序

RabbitMQ中,消息最终会保存在队列中,在同一个队列中,消息是顺序的,先进先出原则,这个由Rabbitmq保证,通常也不需要开发关心。

  • 不同队列中的消息顺序,是没有保证的。

消费消息的顺序

我们说如何保证消息顺序性,通常说的就是消费者消费消息的顺序,在多个消费者消费同一个消息队列的场景,通常是无法保证消息顺序的,开篇的示意图已经说明,虽然消息队列的消息是顺序的,但是多个消费者并发消费消息,获取的消息的速度、执行业务逻辑的速度快慢、执行异常等等原因都会导致消息顺序不一致。

例如:消息A、B、C按顺序进入队列,消费者A1拿到消息A、消费者B1拿到消息B, 结果消费者B执行速度快,就跑完了,又或者消费者A1挂了,都会导致消息顺序不一致。
解决消费顺序的问题,通常就是一个队列只有一个消费者
  • 这样可以按顺序处理消息,但并发能力下降了,无法并发消费消息,这是个取舍问题。
  • 如果业务又要顺序消费,又要增加并发,通常思路就是开启多个队列,业务根据规则将消息分发到不同的队列,通过增加队列的数量来提高并发度。
    例如:电商订单场景,只需要保证同一个用户的订单消息的顺序性就行,不同用户之间没有关系,所以只要让同一个用户的订单消息进入同一个队列就行,其他用户的订单消息,可以进入不同的队列。

优先级队列(消息优先级)

优先级队列,顾名思义,优先级高的消息具备优先被消费的特权。

  • 只有当消费者不足,不能及时进行消费的情况下,优先级队列才会生效
  • RabbitMQ 3.5 版本以后才支持优先级队列。


RabbitMQ优先级队列使用步骤:

  1. 设置队列最大优先级:在声明队列的时候,通过队列属性(x-max-priority)设置队列的最大优先级,优先级的最大值为 255,官方建议最好在 1 到 10 之间。
        // Java
        Channel ch = ...;
        Map<String, Object> map = new HashMap<String, Object>();
        map.put("x-max-priority", 10);
        ch.queueDeclare(PRIORITY_QUEUE, true, false, false, map);
    
        // SpringBoot
        @Bean
        public Queue priorityQueue() {
            Map<String, Object> map = new HashMap<>();
            //给当前队列配置最大优先级
            map.put("x-max-priority", 10);
            return new Queue(PRIORITY_QUEUE, true, false, false, map);
        }
    
    • 只能通过声明方式设定,不能通过策略方式修改;
  2. 设置消息优先级:通过消息属性(Priority
        for(int i=0;i<10;i++) {
                AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
                builder.priority(5);
                AMQP.BasicProperties properties = builder.build();
                channel.basicPublish("priority_exchange", "rk_priority", properties, message.getBytes(StandardCharsets.UTF_8));
            }
    
        // SpringBoot
        amqpTemplate.convertAndSend("priority_exchange", "rk_priority", message, new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        message.getMessageProperties().setPriority(final1);
                        return message;
                    }
                });
    


优先级队列必须和优先级消息一起使用,才能发挥出效果,但是会消耗性能。

死信队列

在 RabbitMQ 中,当消息在一个队列中变成一个死信Dead Letter,消费者无法处理的消息)之后,它将被重新投递到另一个交换机上,这个交换机我们就叫做死信交换机,死信交换机将死信投递到一个队列上就是死信队列

RabbitMQ:死信队列.png
  • 如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃。


产生条件

  1. 消息被消费者手动拒绝(basic.reject / basic.nack),并且“requeue = false”(禁止重新入队);
        // 拒绝消息,给死信队列
        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
    
  2. 消息 TTL(消息存活时间)过期;
        @Bean
        public Queue queue(){
            Map<String,Object> map = new HashMap<>();
            map.put("x-dead-letter-exchange",BEI_EXCHANGE_NAME);
            map.put("x-dead-letter-routing-key",BEI_ROUTING_KEY);
            map.put("x-message-ttl",7200); // 队列过期时间
            Queue queue = new Queue(QUEUE_NAME,true,false,false,map);
            return queue;
        }
    
  3. 队列达到最大长度;
        @Bean
        public Queue queue(){
            Map<String,Object> map = new HashMap<>();
            map.put("x-dead-letter-exchange",BEI_EXCHANGE_NAME);
            map.put("x-dead-letter-routing-key",BEI_ROUTING_KEY);
            map.put("x-max-length",3); // 队列最大长度
    //        map.put("x-message-ttl",7200); // 队列过期时间
            Queue queue = new Queue(QUEUE_NAME,true,false,false,map);
            return queue;
        }
    

处理步骤

  1. 定义死信交换机
  2. 定义死信队列
  3. 定义死信消费者
  4. 将死信交换机绑定到指定队列
  • (死信交换机、死信队列、死信消费者,其实就是普通的交换机、队列、消费者)

示例:

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class DlxProducer {
    public static void main(String[] args) throws Exception {
        // 设置连接以及创建 channel 
        String exchangeName = "test_dlx_exchange";
        String routingKey = "item.update";
      
        String msg = "this is dlx msg";
 
        // 设置消息过期时间,10秒后再消费 让消息进入死信队列(死信条件)
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
                .deliveryMode(2)
                .expiration("10000")
                .build();
 
        channel.basicPublish(exchangeName, routingKey, true, properties, msg.getBytes());
        System.out.println("Send message : " + msg);
 
        channel.close();
        connection.close();
    }
}
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
 
public class DlxConsumer {
    public static void main(String[] args) throws Exception {
        // 创建连接、创建 channel 忽略 内容可以在上面代码中获取
        String exchangeName = "test_dlx_exchange";
        String queueName = "test_dlx_queue";
        String routingKey = "item.#";
 
        // 声明交换机
        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        // 声明队列
        // 1、设置参数到 arguments 中
        Map<String, Object> arguments = new HashMap<String, Object>();
        arguments.put("x-dead-letter-exchange", "dlx.exchange");  // 设置死信交换机名称为:dlx.exchange
        // 2、将 arguments 放入队列的声明中
        channel.queueDeclare(queueName, true, false, false, arguments);
        // 绑定
        channel.queueBind(queueName, exchangeName, routingKey);
 
 
        // 声明死信交换机
        channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
        // 声明死信队列
        channel.queueDeclare("dlx.queue", true, false, false, null);
        // 路由键为 # 代表可以路由到所有消息
        channel.queueBind("dlx.queue", "dlx.exchange", "#");
 
 
        // 消费者
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            }
        };
        // 设置 Channel 消费者绑定队列
        channel.basicConsume(queueName, true, consumer);
        
        
        // 死信消费者
        Consumer dlxConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received Dead Letter'" + message + "'");
            }
        };
        // 设置 Channel 消费者绑定队列
        channel.basicConsume("dlx.queue", true, dlxConsumer);
    }
}

消息过期时间(TTL)

RabbitMQ的消息过期时间(TTL)下面两种方式进行设置:

  1. 通过队列属性设置,队列中所有消息都有相同的过期时间。
        Map<String, Object> arguments = new HashMap<String, Object>();
        arguments.put("x-message-ttl", "60000");  // 设置队列消息的过期时间(单位 ms)
    
        channel.queueDeclare(queueName, true, false, false, arguments);
    
  2. 对每条消息单独设置过期时间,每条消息的TTL可以不同。
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
                    .deliveryMode(2)
                    .expiration("10000")
                    .build();
     
        channel.basicPublish(exchangeName, routingKey, true, properties, msg.getBytes());
    
  • 如果两种方法一起使用,则消息的 TTL 以两者之间较小的那个数值为准

延迟队列(延迟消息)

RabbitMQ 原生不支持延迟消息,目前主要通过:

  1. 死信交换机 + 消息TTL 方案,
  2. 或者 rabbitmq-delayed-message-exchange 插件

两种方式来实现。


应用场景:

  • 对消息生产和消费有时间窗口要求的场景。
    例如,在电商交易中超时未支付关闭订单的场景,在订单创建时会发送一条延时消息。这条消息将会在 30 分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。如支付未完成,则关闭订单。如已完成支付则忽略。
  • 通过消息触发延时任务的场景。
    例如,在指定时间段之后向用户发送提醒消息。

死信交换机 + 消息TTL

这个方案核心思想就是,创建一个没有消费者的队列,借助消息过期时间(TTL),当一条消息过期后会成为死信,这条死信消息会投递到死信交换机,死信交换机将消息发给死信队列,我们只要消费死信队列即可。

  • 在这个方案中,消息过期时间就是消息延迟时间,
    例如: 消息 TTL=30秒,因为这个队列没有消费者,消息 30 秒后过期,这条消息就变成死信,会被死信队列处理。


【结合上面两节即可】

延迟消息插件

  1. 安装插件
    从 github 或官网插件页面, 选择下载插件对应版本,把文件(如:rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez)放到 rabbitmq 插件目录(plugins目录);
  2. 激活插件
    (RabbitMQ安装目录的sbin目录下执行)
    rabbitmq-plugins enable rabbitmq_delayed_message_exchange
    
    RabbitMQ:安装插件rabbitmq delayed message exchange.png
    • 安装成功后,重启 RabbitMQ;
  3. 定义交换机
    通过 x-delayed-type 设置自定义交换机属性,支持发送延迟消息:
        /**
         * 订单延迟插件消息队列所绑定的交换机
         */
        @Bean
        CustomExchange  orderPluginDirect() {
            /** 创建一个自定义交换机,可以发送延迟消息
             * 
             * name:QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getExchange()
             * type:"x-delayed-message"
             * durable:true
             * autoDelete:false
             * props:args
             */
            Map<String, Object> args = new HashMap<>();
            args.put("x-delayed-type", "direct");
            return new CustomExchange(QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getExchange(), "x-delayed-message", true, false, args);
        }
    
        /**
         * 订单延迟插件队列
         */
        @Bean
        public Queue orderPluginQueue() {
            return new Queue(QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getName());
        }
    
        /**
         * 将订单延迟插件队列绑定到交换机
         */
        @Bean
        public Binding orderPluginBinding(CustomExchange orderPluginDirect,Queue orderPluginQueue) {
            return BindingBuilder
                    .bind(orderPluginQueue)
                    .to(orderPluginDirect)
                    .with(QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getRouteKey())
                    .noargs();
        }
    
  4. 发送延迟消息
    通过消息头(x-delay),设置消息延迟时间:
        /** 给延迟队列发送消息(方式一)
        * exchange:QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getExchange()
        * routeKey:QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getRouteKey()
        * message:orderId
        * messagePostProcessor:new MessagePostProcessor() { . . . }
        */
        amqpTemplate.convertAndSend(QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getExchange(), QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getRouteKey(), orderId, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                // 给消息设置延迟毫秒值
                message.getMessageProperties().setHeader("x-delay",delayTimes);
                return message;
            }
        });
        
        
        /** 给延迟队列发送消息(方式二)
        * exchange:RabbitConstants.Exchange.DELAY_EXCHANGE_NAME
        * routeKey:RabbitConstants.routingKey.DELAY_ROUTING_KEY
        * message:JSON.toJSONString(message)
        * messagePostProcessor: message1 -> { . . . }
        */
        rabbitTemplate.convertAndSend(RabbitConstants.Exchange.DELAY_EXCHANGE_NAME, RabbitConstants.routingKey.DELAY_ROUTING_KEY,
                    JSON.toJSONString(message), message1 -> {
                        message1.getMessageProperties().setDelay(delay);
                        return message1;
                    });
    


  • 提示:如果你直接使用阿里云的 RabbitMQ 消息云服务,通过消息头属性(delay),设置延迟时间即可,不用安装插件,阿里云已经对 RabbitMQ 扩展了。