RabbitMQ:高级特性
持久化机制
RabbitMQ 持久化机制分为:
- 交换器持久化:声明交换机的时候可以通过属性设置是否需要持久化;
// 声明交换机:第三个参数为持久化选项 channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT, true);
- 队列持久化:声明队列的时候可以设置队列是否需要持久化;
// 声明队列:第二个参数为持久化选项 channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
- 消息持久化:在发布消息的时候,通过消息属性可以设置,消息是否需要持久化;
// 发送消息:第三个参数为消息属性 //“MessageProperties.PERSISTENT_TEXT_PLAIN”:将消息标记为持久性,确保即使 RabbitMQ 重新启动,队列也不会丢失。 channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));
- 队列持久化,不代表消息就是持久化的。
- 不管是持久化的消息还是非持久化的消息都可以被写入到磁盘:
- 持久化消息会同时写入磁盘和内存(加快读取速度);
- 非持久化消息会在内存不够用时,将消息写入磁盘(Rabbitmq重启之后就没有了)。
消息顺序
RabbitMQ使用过程中,有些业务场景需要我们保证顺序消费,例如:业务上产生三条消息,分别是对数据的增加、修改、删除操作,如果没有保证顺序消费,执行顺序可能变成删除、修改、增加,这就乱了。
RabbitMQ的消息顺序问题,需要分三个环节看待,发送消息的顺序、队列中消息的顺序、消费消息的顺序。
发送消息的顺序
消息发送端的顺序,大部分业务不做要求,谁先发消息无所谓,如果遇到业务一定要发送消息也确保顺序,那意味着,只能全局加锁一个个的操作,一个个的发消息,不能并发发送消息。
队列中消息的顺序
RabbitMQ中,消息最终会保存在队列中,在同一个队列中,消息是顺序的,先进先出原则,这个由Rabbitmq保证,通常也不需要开发关心。
- 不同队列中的消息顺序,是没有保证的。
消费消息的顺序
我们说如何保证消息顺序性,通常说的就是消费者消费消息的顺序,在多个消费者消费同一个消息队列的场景,通常是无法保证消息顺序的,开篇的示意图已经说明,虽然消息队列的消息是顺序的,但是多个消费者并发消费消息,获取的消息的速度、执行业务逻辑的速度快慢、执行异常等等原因都会导致消息顺序不一致。
- 例如:消息A、B、C按顺序进入队列,消费者A1拿到消息A、消费者B1拿到消息B, 结果消费者B执行速度快,就跑完了,又或者消费者A1挂了,都会导致消息顺序不一致。
解决消费顺序的问题,通常就是一个队列只有一个消费者。
- 这样可以按顺序处理消息,但并发能力下降了,无法并发消费消息,这是个取舍问题。
- 如果业务又要顺序消费,又要增加并发,通常思路就是开启多个队列,业务根据规则将消息分发到不同的队列,通过增加队列的数量来提高并发度。
- 例如:电商订单场景,只需要保证同一个用户的订单消息的顺序性就行,不同用户之间没有关系,所以只要让同一个用户的订单消息进入同一个队列就行,其他用户的订单消息,可以进入不同的队列。
优先级队列(消息优先级)
优先级队列,顾名思义,优先级高的消息具备优先被消费的特权。
- 只有当消费者不足,不能及时进行消费的情况下,优先级队列才会生效。
- RabbitMQ 3.5 版本以后才支持优先级队列。
RabbitMQ优先级队列使用步骤:
- 设置队列最大优先级:在声明队列的时候,通过队列属性(x-max-priority)设置队列的最大优先级,优先级的最大值为 255,官方建议最好在 1 到 10 之间。
// Java Channel ch = ...; Map<String, Object> map = new HashMap<String, Object>(); map.put("x-max-priority", 10); ch.queueDeclare(PRIORITY_QUEUE, true, false, false, map);
// SpringBoot @Bean public Queue priorityQueue() { Map<String, Object> map = new HashMap<>(); //给当前队列配置最大优先级 map.put("x-max-priority", 10); return new Queue(PRIORITY_QUEUE, true, false, false, map); }
- 只能通过声明方式设定,不能通过策略方式修改;
- 设置消息优先级:通过消息属性(Priority)
for(int i=0;i<10;i++) { AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); builder.priority(5); AMQP.BasicProperties properties = builder.build(); channel.basicPublish("priority_exchange", "rk_priority", properties, message.getBytes(StandardCharsets.UTF_8)); }
// SpringBoot amqpTemplate.convertAndSend("priority_exchange", "rk_priority", message, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setPriority(final1); return message; } });
优先级队列必须和优先级消息一起使用,才能发挥出效果,但是会消耗性能。
死信队列
在 RabbitMQ 中,当消息在一个队列中变成一个死信(Dead Letter,消费者无法处理的消息)之后,它将被重新投递到另一个交换机上,这个交换机我们就叫做死信交换机,死信交换机将死信投递到一个队列上就是死信队列。
- 如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃。
产生条件
- 消息被消费者手动拒绝(basic.reject / basic.nack),并且“requeue = false”(禁止重新入队);
// 拒绝消息,给死信队列 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
- 消息 TTL(消息存活时间)过期;
@Bean public Queue queue(){ Map<String,Object> map = new HashMap<>(); map.put("x-dead-letter-exchange",BEI_EXCHANGE_NAME); map.put("x-dead-letter-routing-key",BEI_ROUTING_KEY); map.put("x-message-ttl",7200); // 队列过期时间 Queue queue = new Queue(QUEUE_NAME,true,false,false,map); return queue; }
- 队列达到最大长度;
@Bean public Queue queue(){ Map<String,Object> map = new HashMap<>(); map.put("x-dead-letter-exchange",BEI_EXCHANGE_NAME); map.put("x-dead-letter-routing-key",BEI_ROUTING_KEY); map.put("x-max-length",3); // 队列最大长度 // map.put("x-message-ttl",7200); // 队列过期时间 Queue queue = new Queue(QUEUE_NAME,true,false,false,map); return queue; }
处理步骤
- 定义死信交换机;
- 定义死信队列;
- 定义死信消费者;
- 将死信交换机绑定到指定队列;
- (死信交换机、死信队列、死信消费者,其实就是普通的交换机、队列、消费者)
示例:
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class DlxProducer {
public static void main(String[] args) throws Exception {
// 设置连接以及创建 channel
String exchangeName = "test_dlx_exchange";
String routingKey = "item.update";
String msg = "this is dlx msg";
// 设置消息过期时间,10秒后再消费 让消息进入死信队列(死信条件)
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.deliveryMode(2)
.expiration("10000")
.build();
channel.basicPublish(exchangeName, routingKey, true, properties, msg.getBytes());
System.out.println("Send message : " + msg);
channel.close();
connection.close();
}
}
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public class DlxConsumer {
public static void main(String[] args) throws Exception {
// 创建连接、创建 channel 忽略 内容可以在上面代码中获取
String exchangeName = "test_dlx_exchange";
String queueName = "test_dlx_queue";
String routingKey = "item.#";
// 声明交换机
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
// 声明队列
// 1、设置参数到 arguments 中
Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-dead-letter-exchange", "dlx.exchange"); // 设置死信交换机名称为:dlx.exchange
// 2、将 arguments 放入队列的声明中
channel.queueDeclare(queueName, true, false, false, arguments);
// 绑定
channel.queueBind(queueName, exchangeName, routingKey);
// 声明死信交换机
channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
// 声明死信队列
channel.queueDeclare("dlx.queue", true, false, false, null);
// 路由键为 # 代表可以路由到所有消息
channel.queueBind("dlx.queue", "dlx.exchange", "#");
// 消费者
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
// 设置 Channel 消费者绑定队列
channel.basicConsume(queueName, true, consumer);
// 死信消费者
Consumer dlxConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received Dead Letter'" + message + "'");
}
};
// 设置 Channel 消费者绑定队列
channel.basicConsume("dlx.queue", true, dlxConsumer);
}
}
消息过期时间(TTL)
RabbitMQ的消息过期时间(TTL)下面两种方式进行设置:
- 通过队列属性设置,队列中所有消息都有相同的过期时间。
Map<String, Object> arguments = new HashMap<String, Object>(); arguments.put("x-message-ttl", "60000"); // 设置队列消息的过期时间(单位 ms) channel.queueDeclare(queueName, true, false, false, arguments);
- 对每条消息单独设置过期时间,每条消息的TTL可以不同。
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder() .deliveryMode(2) .expiration("10000") .build(); channel.basicPublish(exchangeName, routingKey, true, properties, msg.getBytes());
- 如果两种方法一起使用,则消息的 TTL 以两者之间较小的那个数值为准。
延迟队列(延迟消息)
RabbitMQ 原生不支持延迟消息,目前主要通过:
- 死信交换机 + 消息TTL 方案,
- 或者 rabbitmq-delayed-message-exchange 插件
两种方式来实现。
应用场景:
- 对消息生产和消费有时间窗口要求的场景。
- 例如,在电商交易中超时未支付关闭订单的场景,在订单创建时会发送一条延时消息。这条消息将会在 30 分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。如支付未完成,则关闭订单。如已完成支付则忽略。
- 通过消息触发延时任务的场景。
- 例如,在指定时间段之后向用户发送提醒消息。
死信交换机 + 消息TTL
这个方案核心思想就是,创建一个没有消费者的队列,借助消息过期时间(TTL),当一条消息过期后会成为死信,这条死信消息会投递到死信交换机,死信交换机将消息发给死信队列,我们只要消费死信队列即可。
- 在这个方案中,消息过期时间就是消息延迟时间,
- 例如: 消息 TTL=30秒,因为这个队列没有消费者,消息 30 秒后过期,这条消息就变成死信,会被死信队列处理。
【结合上面两节即可】
延迟消息插件
- 安装插件:
- 从 github 或官网插件页面, 选择下载插件对应版本,把文件(如:rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez)放到 rabbitmq 插件目录(plugins目录);
- 激活插件:
- 安装成功后,重启 RabbitMQ;
- 定义交换机:
- 通过 x-delayed-type 设置自定义交换机属性,支持发送延迟消息:
/** * 订单延迟插件消息队列所绑定的交换机 */ @Bean CustomExchange orderPluginDirect() { /** 创建一个自定义交换机,可以发送延迟消息 * * name:QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getExchange() * type:"x-delayed-message" * durable:true * autoDelete:false * props:args */ Map<String, Object> args = new HashMap<>(); args.put("x-delayed-type", "direct"); return new CustomExchange(QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getExchange(), "x-delayed-message", true, false, args); } /** * 订单延迟插件队列 */ @Bean public Queue orderPluginQueue() { return new Queue(QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getName()); } /** * 将订单延迟插件队列绑定到交换机 */ @Bean public Binding orderPluginBinding(CustomExchange orderPluginDirect,Queue orderPluginQueue) { return BindingBuilder .bind(orderPluginQueue) .to(orderPluginDirect) .with(QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getRouteKey()) .noargs(); }
- 发送延迟消息:
- 通过消息头(x-delay),设置消息延迟时间:
/** 给延迟队列发送消息(方式一) * exchange:QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getExchange() * routeKey:QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getRouteKey() * message:orderId * messagePostProcessor:new MessagePostProcessor() { . . . } */ amqpTemplate.convertAndSend(QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getExchange(), QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getRouteKey(), orderId, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { // 给消息设置延迟毫秒值 message.getMessageProperties().setHeader("x-delay",delayTimes); return message; } }); /** 给延迟队列发送消息(方式二) * exchange:RabbitConstants.Exchange.DELAY_EXCHANGE_NAME * routeKey:RabbitConstants.routingKey.DELAY_ROUTING_KEY * message:JSON.toJSONString(message) * messagePostProcessor: message1 -> { . . . } */ rabbitTemplate.convertAndSend(RabbitConstants.Exchange.DELAY_EXCHANGE_NAME, RabbitConstants.routingKey.DELAY_ROUTING_KEY, JSON.toJSONString(message), message1 -> { message1.getMessageProperties().setDelay(delay); return message1; });
- 提示:如果你直接使用阿里云的 RabbitMQ 消息云服务,通过消息头属性(delay),设置延迟时间即可,不用安装插件,阿里云已经对 RabbitMQ 扩展了。