“RabbitMQ:高级特性”的版本间差异

来自Wikioe
跳到导航 跳到搜索
无编辑摘要
第51行: 第51行:


== 优先级队列(消息优先级) ==
== 优先级队列(消息优先级) ==
优先级队列,顾名思义,优先级高的消息具备优先被消费的特权。
* '''只有当消费者不足,不能及时进行消费的情况下,优先级队列才会生效'''。
* RabbitMQ '''3.5''' 版本以后才支持优先级队列。
RabbitMQ优先级队列使用步骤:
# '''设置队列最大优先级''':在声明队列的时候,通过队列属性('''x-max-priority''')设置队列的最大优先级,优先级的最大值为 255,官方建议最好在 1 到 10 之间。
#: <syntaxhighlight lang="Java" highlight="">
   // Java
   Channel ch = ...;
   Map<String, Object> map = new HashMap<String, Object>();
   map.put("x-max-priority", 10);
   ch.queueDeclare(PRIORITY_QUEUE, true, false, false, map);
</syntaxhighlight>
#: <syntaxhighlight lang="Java" highlight="">
   // SpringBoot
   @Bean
    public Queue priorityQueue() {
        Map<String, Object> map = new HashMap<>();
        //给当前队列配置最大优先级
        map.put("x-max-priority", 10);
        return new Queue(PRIORITY_QUEUE, true, false, false, map);
    }
</syntaxhighlight>
#* 只能通过声明方式设定,不能通过策略方式修改;
# '''设置消息优先级''':通过消息属性(Priority)
#: <syntaxhighlight lang="Java" highlight="">
   for(int i=0;i<10;i++) {
            AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
            builder.priority(5);
            AMQP.BasicProperties properties = builder.build();
            channel.basicPublish("priority_exchange", "rk_priority", properties, message.getBytes(StandardCharsets.UTF_8));
        }
</syntaxhighlight>
#: <syntaxhighlight lang="Java" highlight="">
    // SpringBoot
    amqpTemplate.convertAndSend("priority_exchange", "rk_priority", message, new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    message.getMessageProperties().setPriority(finalI);
                    return message;
                }
            });
</syntaxhighlight>
'''优先级队列必须和优先级消息一起使用''',才能发挥出效果,但是会消耗性能。
== 死信队列 ==
在 RabbitMQ 中,当消息在一个队列中变成一个'''死信'''(消费者无法处理的消息)之后,它将被重新投递到另一个交换机上,这个交换机我们就叫做'''死信交换机''',死信交换机将死信投递到一个队列上就是'''死信队列'''。
:

2021年5月27日 (四) 16:53的版本


持久化机制

RabbitMQ 持久化机制分为:

  1. 交换器持久化:声明交换机的时候可以通过属性设置是否需要持久化;
    // 声明交换机:第三个参数为持久化选项
    channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT, true);
    
  2. 队列持久化:声明队列的时候可以设置队列是否需要持久化;
    // 声明队列:第二个参数为持久化选项
    channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
    
  3. 消息持久化:在发布消息的时候,通过消息属性可以设置,消息是否需要持久化;
    // 发送消息:第三个参数为消息属性
    //“MessageProperties.PERSISTENT_TEXT_PLAIN”:将消息标记为持久性,确保即使 RabbitMQ 重新启动,队列也不会丢失。
    channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));
    
  • 队列持久化,不代表消息就是持久化的。
  • 不管是持久化的消息还是非持久化的消息都可以被写入到磁盘:
    • 持久化消息会同时写入磁盘和内存(加快读取速度);
    • 非持久化消息会在内存不够用时,将消息写入磁盘(Rabbitmq重启之后就没有了)。

消息顺序

RabbitMQ使用过程中,有些业务场景需要我们保证顺序消费,例如:业务上产生三条消息,分别是对数据的增加、修改、删除操作,如果没有保证顺序消费,执行顺序可能变成删除、修改、增加,这就乱了。

RabbitMQ:消息顺序示例.png


RabbitMQ的消息顺序问题,需要分三个环节看待,发送消息的顺序队列中消息的顺序消费消息的顺序

发送消息的顺序

消息发送端的顺序,大部分业务不做要求,谁先发消息无所谓,如果遇到业务一定要发送消息也确保顺序,那意味着,只能全局加锁一个个的操作,一个个的发消息,不能并发发送消息

队列中消息的顺序

RabbitMQ中,消息最终会保存在队列中,在同一个队列中,消息是顺序的,先进先出原则,这个由Rabbitmq保证,通常也不需要开发关心。

  • 不同队列中的消息顺序,是没有保证的。

消费消息的顺序

我们说如何保证消息顺序性,通常说的就是消费者消费消息的顺序,在多个消费者消费同一个消息队列的场景,通常是无法保证消息顺序的,开篇的示意图已经说明,虽然消息队列的消息是顺序的,但是多个消费者并发消费消息,获取的消息的速度、执行业务逻辑的速度快慢、执行异常等等原因都会导致消息顺序不一致。

例如:消息A、B、C按顺序进入队列,消费者A1拿到消息A、消费者B1拿到消息B, 结果消费者B执行速度快,就跑完了,又或者消费者A1挂了,都会导致消息顺序不一致。

解决消费顺序的问题,通常就是一个队列只有一个消费者

  • 这样可以按顺序处理消息,但并发能力下降了,无法并发消费消息,这是个取舍问题。
  • 如果业务又要顺序消费,又要增加并发,通常思路就是开启多个队列,业务根据规则将消息分发到不同的队列,通过增加队列的数量来提高并发度。
    例如:电商订单场景,只需要保证同一个用户的订单消息的顺序性就行,不同用户之间没有关系,所以只要让同一个用户的订单消息进入同一个队列就行,其他用户的订单消息,可以进入不同的队列。

优先级队列(消息优先级)

优先级队列,顾名思义,优先级高的消息具备优先被消费的特权。

  • 只有当消费者不足,不能及时进行消费的情况下,优先级队列才会生效
  • RabbitMQ 3.5 版本以后才支持优先级队列。


RabbitMQ优先级队列使用步骤:

  1. 设置队列最大优先级:在声明队列的时候,通过队列属性(x-max-priority)设置队列的最大优先级,优先级的最大值为 255,官方建议最好在 1 到 10 之间。
        // Java
        Channel ch = ...;
        Map<String, Object> map = new HashMap<String, Object>();
        map.put("x-max-priority", 10);
        ch.queueDeclare(PRIORITY_QUEUE, true, false, false, map);
    
        // SpringBoot
        @Bean
        public Queue priorityQueue() {
            Map<String, Object> map = new HashMap<>();
            //给当前队列配置最大优先级
            map.put("x-max-priority", 10);
            return new Queue(PRIORITY_QUEUE, true, false, false, map);
        }
    
    • 只能通过声明方式设定,不能通过策略方式修改;
  2. 设置消息优先级:通过消息属性(Priority)
        for(int i=0;i<10;i++) {
                AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
                builder.priority(5);
                AMQP.BasicProperties properties = builder.build();
                channel.basicPublish("priority_exchange", "rk_priority", properties, message.getBytes(StandardCharsets.UTF_8));
            }
    
        // SpringBoot
        amqpTemplate.convertAndSend("priority_exchange", "rk_priority", message, new MessagePostProcessor() {
                    @Override
                    public Message postProcessMessage(Message message) throws AmqpException {
                        message.getMessageProperties().setPriority(finalI);
                        return message;
                    }
                });
    


优先级队列必须和优先级消息一起使用,才能发挥出效果,但是会消耗性能。

死信队列

在 RabbitMQ 中,当消息在一个队列中变成一个死信(消费者无法处理的消息)之后,它将被重新投递到另一个交换机上,这个交换机我们就叫做死信交换机,死信交换机将死信投递到一个队列上就是死信队列