“RabbitMQ:高级特性”的版本间差异
跳到导航
跳到搜索
(→死信队列) |
(→死信队列) |
||
第106行: | 第106行: | ||
=== 死信产生的条件 === | |||
# 消息被消费者手动拒绝(basic.reject / basic.nack),并且“requeue = false”(禁止重新入队); | # 消息被消费者手动拒绝(basic.reject / basic.nack),并且“requeue = false”(禁止重新入队); | ||
#: <syntaxhighlight lang="Java" highlight=""> | #: <syntaxhighlight lang="Java" highlight=""> | ||
第139行: | 第139行: | ||
=== 死信处理步骤 === | |||
# '''定义死信交换机'''; | # '''定义死信交换机'''; | ||
# '''定义死信队列'''; | # '''定义死信队列'''; |
2021年5月27日 (四) 18:04的版本
持久化机制
RabbitMQ 持久化机制分为:
- 交换器持久化:声明交换机的时候可以通过属性设置是否需要持久化;
// 声明交换机:第三个参数为持久化选项 channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT, true);
- 队列持久化:声明队列的时候可以设置队列是否需要持久化;
// 声明队列:第二个参数为持久化选项 channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
- 消息持久化:在发布消息的时候,通过消息属性可以设置,消息是否需要持久化;
// 发送消息:第三个参数为消息属性 //“MessageProperties.PERSISTENT_TEXT_PLAIN”:将消息标记为持久性,确保即使 RabbitMQ 重新启动,队列也不会丢失。 channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));
- 队列持久化,不代表消息就是持久化的。
- 不管是持久化的消息还是非持久化的消息都可以被写入到磁盘:
- 持久化消息会同时写入磁盘和内存(加快读取速度);
- 非持久化消息会在内存不够用时,将消息写入磁盘(Rabbitmq重启之后就没有了)。
消息顺序
RabbitMQ使用过程中,有些业务场景需要我们保证顺序消费,例如:业务上产生三条消息,分别是对数据的增加、修改、删除操作,如果没有保证顺序消费,执行顺序可能变成删除、修改、增加,这就乱了。
RabbitMQ的消息顺序问题,需要分三个环节看待,发送消息的顺序、队列中消息的顺序、消费消息的顺序。
发送消息的顺序
消息发送端的顺序,大部分业务不做要求,谁先发消息无所谓,如果遇到业务一定要发送消息也确保顺序,那意味着,只能全局加锁一个个的操作,一个个的发消息,不能并发发送消息。
队列中消息的顺序
RabbitMQ中,消息最终会保存在队列中,在同一个队列中,消息是顺序的,先进先出原则,这个由Rabbitmq保证,通常也不需要开发关心。
- 不同队列中的消息顺序,是没有保证的。
消费消息的顺序
我们说如何保证消息顺序性,通常说的就是消费者消费消息的顺序,在多个消费者消费同一个消息队列的场景,通常是无法保证消息顺序的,开篇的示意图已经说明,虽然消息队列的消息是顺序的,但是多个消费者并发消费消息,获取的消息的速度、执行业务逻辑的速度快慢、执行异常等等原因都会导致消息顺序不一致。
- 例如:消息A、B、C按顺序进入队列,消费者A1拿到消息A、消费者B1拿到消息B, 结果消费者B执行速度快,就跑完了,又或者消费者A1挂了,都会导致消息顺序不一致。
解决消费顺序的问题,通常就是一个队列只有一个消费者。
- 这样可以按顺序处理消息,但并发能力下降了,无法并发消费消息,这是个取舍问题。
- 如果业务又要顺序消费,又要增加并发,通常思路就是开启多个队列,业务根据规则将消息分发到不同的队列,通过增加队列的数量来提高并发度。
- 例如:电商订单场景,只需要保证同一个用户的订单消息的顺序性就行,不同用户之间没有关系,所以只要让同一个用户的订单消息进入同一个队列就行,其他用户的订单消息,可以进入不同的队列。
优先级队列(消息优先级)
优先级队列,顾名思义,优先级高的消息具备优先被消费的特权。
- 只有当消费者不足,不能及时进行消费的情况下,优先级队列才会生效。
- RabbitMQ 3.5 版本以后才支持优先级队列。
RabbitMQ优先级队列使用步骤:
- 设置队列最大优先级:在声明队列的时候,通过队列属性(x-max-priority)设置队列的最大优先级,优先级的最大值为 255,官方建议最好在 1 到 10 之间。
// Java Channel ch = ...; Map<String, Object> map = new HashMap<String, Object>(); map.put("x-max-priority", 10); ch.queueDeclare(PRIORITY_QUEUE, true, false, false, map);
// SpringBoot @Bean public Queue priorityQueue() { Map<String, Object> map = new HashMap<>(); //给当前队列配置最大优先级 map.put("x-max-priority", 10); return new Queue(PRIORITY_QUEUE, true, false, false, map); }
- 只能通过声明方式设定,不能通过策略方式修改;
- 设置消息优先级:通过消息属性(Priority)
for(int i=0;i<10;i++) { AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); builder.priority(5); AMQP.BasicProperties properties = builder.build(); channel.basicPublish("priority_exchange", "rk_priority", properties, message.getBytes(StandardCharsets.UTF_8)); }
// SpringBoot amqpTemplate.convertAndSend("priority_exchange", "rk_priority", message, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setPriority(finalI); return message; } });
优先级队列必须和优先级消息一起使用,才能发挥出效果,但是会消耗性能。
死信队列
在 RabbitMQ 中,当消息在一个队列中变成一个死信(Dead Letter,消费者无法处理的消息)之后,它将被重新投递到另一个交换机上,这个交换机我们就叫做死信交换机,死信交换机将死信投递到一个队列上就是死信队列。
- 如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃。
死信产生的条件
- 消息被消费者手动拒绝(basic.reject / basic.nack),并且“requeue = false”(禁止重新入队);
// 拒绝消息,给死信队列 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
- 消息 TTL(消息存活时间)过期;
@Bean public Queue queue(){ Map<String,Object> map = new HashMap<>(); map.put("x-dead-letter-exchange",BEI_EXCHANGE_NAME); map.put("x-dead-letter-routing-key",BEI_ROUTING_KEY); map.put("x-message-ttl",7200); // 队列过期时间 Queue queue = new Queue(QUEUE_NAME,true,false,false,map); return queue; }
- 队列达到最大长度;
@Bean public Queue queue(){ Map<String,Object> map = new HashMap<>(); map.put("x-dead-letter-exchange",BEI_EXCHANGE_NAME); map.put("x-dead-letter-routing-key",BEI_ROUTING_KEY); map.put("x-max-length",3); // 队列最大长度 // map.put("x-message-ttl",7200); // 队列过期时间 Queue queue = new Queue(QUEUE_NAME,true,false,false,map); return queue; }
死信处理步骤
- 定义死信交换机;
- 定义死信队列;
- 定义死信消费者;
- 将死信交换机绑定到指定队列;
- (死信交换机、死信队列、死信消费者,其实就是普通的交换机、队列、消费者)
示例:
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class DlxProducer {
public static void main(String[] args) throws Exception {
// 设置连接以及创建 channel
String exchangeName = "test_dlx_exchange";
String routingKey = "item.update";
String msg = "this is dlx msg";
// 设置消息过期时间,10秒后再消费 让消息进入死信队列(死信条件)
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.deliveryMode(2)
.expiration("10000")
.build();
channel.basicPublish(exchangeName, routingKey, true, properties, msg.getBytes());
System.out.println("Send message : " + msg);
channel.close();
connection.close();
}
}
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
public class DlxConsumer {
public static void main(String[] args) throws Exception {
// 创建连接、创建 channel 忽略 内容可以在上面代码中获取
String exchangeName = "test_dlx_exchange";
String queueName = "test_dlx_queue";
String routingKey = "item.#";
// 声明交换机
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
// 声明队列
// 1、设置参数到 arguments 中
Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-dead-letter-exchange", "dlx.exchange"); // 设置 x-dead-letter-exchange 属性为:dlx.exchange(死信交换机名称)
// 2、将 arguments 放入队列的声明中
channel.queueDeclare(queueName, true, false, false, arguments);
// 绑定
channel.queueBind(queueName, exchangeName, routingKey);
// 声明死信交换机
channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
// 声明死信队列
channel.queueDeclare("dlx.queue", true, false, false, null);
// 路由键为 # 代表可以路由到所有消息
channel.queueBind("dlx.queue", "dlx.exchange", "#");
// 消费者
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
// 设置 Channel 消费者绑定队列
channel.basicConsume(queueName, true, consumer);
// 死信消费者
Consumer dlxConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received Dead Letter'" + message + "'");
}
};
// 设置 Channel 消费者绑定队列
channel.basicConsume("dlx.queue", true, dlxConsumer);
}
}
消息过期时间(TTL)