RabbitMQ:高级特性
持久化机制
RabbitMQ 持久化机制分为:
- 交换器持久化:声明交换机的时候可以通过属性设置是否需要持久化;
// 声明交换机:第三个参数为持久化选项 channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT, true);
- 队列持久化:声明队列的时候可以设置队列是否需要持久化;
// 声明队列:第二个参数为持久化选项 channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
- 消息持久化:在发布消息的时候,通过消息属性可以设置,消息是否需要持久化;
// 发送消息:第三个参数为消息属性 //“MessageProperties.PERSISTENT_TEXT_PLAIN”:将消息标记为持久性,确保即使 RabbitMQ 重新启动,队列也不会丢失。 channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));
- 队列持久化,不代表消息就是持久化的。
- 不管是持久化的消息还是非持久化的消息都可以被写入到磁盘:
- 持久化消息会同时写入磁盘和内存(加快读取速度);
- 非持久化消息会在内存不够用时,将消息写入磁盘(Rabbitmq重启之后就没有了)。
消息顺序
RabbitMQ使用过程中,有些业务场景需要我们保证顺序消费,例如:业务上产生三条消息,分别是对数据的增加、修改、删除操作,如果没有保证顺序消费,执行顺序可能变成删除、修改、增加,这就乱了。
RabbitMQ的消息顺序问题,需要分三个环节看待,发送消息的顺序、队列中消息的顺序、消费消息的顺序。
发送消息的顺序
消息发送端的顺序,大部分业务不做要求,谁先发消息无所谓,如果遇到业务一定要发送消息也确保顺序,那意味着,只能全局加锁一个个的操作,一个个的发消息,不能并发发送消息。
队列中消息的顺序
RabbitMQ中,消息最终会保存在队列中,在同一个队列中,消息是顺序的,先进先出原则,这个由Rabbitmq保证,通常也不需要开发关心。
- 不同队列中的消息顺序,是没有保证的。
消费消息的顺序
我们说如何保证消息顺序性,通常说的就是消费者消费消息的顺序,在多个消费者消费同一个消息队列的场景,通常是无法保证消息顺序的,开篇的示意图已经说明,虽然消息队列的消息是顺序的,但是多个消费者并发消费消息,获取的消息的速度、执行业务逻辑的速度快慢、执行异常等等原因都会导致消息顺序不一致。
- 例如:消息A、B、C按顺序进入队列,消费者A1拿到消息A、消费者B1拿到消息B, 结果消费者B执行速度快,就跑完了,又或者消费者A1挂了,都会导致消息顺序不一致。
解决消费顺序的问题,通常就是一个队列只有一个消费者。
- 这样可以按顺序处理消息,但并发能力下降了,无法并发消费消息,这是个取舍问题。
- 如果业务又要顺序消费,又要增加并发,通常思路就是开启多个队列,业务根据规则将消息分发到不同的队列,通过增加队列的数量来提高并发度。
- 例如:电商订单场景,只需要保证同一个用户的订单消息的顺序性就行,不同用户之间没有关系,所以只要让同一个用户的订单消息进入同一个队列就行,其他用户的订单消息,可以进入不同的队列。
优先级队列(消息优先级)
优先级队列,顾名思义,优先级高的消息具备优先被消费的特权。
- 只有当消费者不足,不能及时进行消费的情况下,优先级队列才会生效。
- RabbitMQ 3.5 版本以后才支持优先级队列。
RabbitMQ优先级队列使用步骤:
- 设置队列最大优先级:在声明队列的时候,通过队列属性(x-max-priority)设置队列的最大优先级,优先级的最大值为 255,官方建议最好在 1 到 10 之间。
// Java Channel ch = ...; Map<String, Object> map = new HashMap<String, Object>(); map.put("x-max-priority", 10); ch.queueDeclare(PRIORITY_QUEUE, true, false, false, map);
// SpringBoot @Bean public Queue priorityQueue() { Map<String, Object> map = new HashMap<>(); //给当前队列配置最大优先级 map.put("x-max-priority", 10); return new Queue(PRIORITY_QUEUE, true, false, false, map); }
- 只能通过声明方式设定,不能通过策略方式修改;
- 设置消息优先级:通过消息属性(Priority)
for(int i=0;i<10;i++) { AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); builder.priority(5); AMQP.BasicProperties properties = builder.build(); channel.basicPublish("priority_exchange", "rk_priority", properties, message.getBytes(StandardCharsets.UTF_8)); }
// SpringBoot amqpTemplate.convertAndSend("priority_exchange", "rk_priority", message, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setPriority(finalI); return message; } });
优先级队列必须和优先级消息一起使用,才能发挥出效果,但是会消耗性能。
死信队列
在 RabbitMQ 中,当消息在一个队列中变成一个死信(消费者无法处理的消息)之后,它将被重新投递到另一个交换机上,这个交换机我们就叫做死信交换机,死信交换机将死信投递到一个队列上就是死信队列。