查看“RabbitMQ:高级特性”的源代码
←
RabbitMQ:高级特性
跳到导航
跳到搜索
因为以下原因,您没有权限编辑本页:
您请求的操作仅限属于该用户组的用户执行:
用户
您可以查看和复制此页面的源代码。
[[category:RabbitMQ]] == 持久化机制 == RabbitMQ 持久化机制分为: # '''交换器持久化''':声明交换机的时候可以通过属性设置是否需要持久化; #: <syntaxhighlight lang="Java" highlight=""> // 声明交换机:第三个参数为持久化选项 channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT, true); </syntaxhighlight> # '''队列持久化''':声明队列的时候可以设置队列是否需要持久化; #: <syntaxhighlight lang="Java" highlight=""> // 声明队列:第二个参数为持久化选项 channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null); </syntaxhighlight> # '''消息持久化''':在发布消息的时候,通过消息属性可以设置,消息是否需要持久化; #: <syntaxhighlight lang="Java" highlight=""> // 发送消息:第三个参数为消息属性 //“MessageProperties.PERSISTENT_TEXT_PLAIN”:将消息标记为持久性,确保即使 RabbitMQ 重新启动,队列也不会丢失。 channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8)); </syntaxhighlight> * 队列持久化,不代表消息就是持久化的。 * 不管是持久化的消息还是非持久化的消息都可以被写入到磁盘: ** 持久化消息会同时写入磁盘和内存(加快读取速度); ** 非持久化消息会在内存不够用时,将消息写入磁盘(Rabbitmq重启之后就没有了)。 == 消息顺序 == RabbitMQ使用过程中,有些业务场景需要我们保证顺序消费,例如:业务上产生三条消息,分别是对数据的增加、修改、删除操作,如果没有保证顺序消费,执行顺序可能变成删除、修改、增加,这就乱了。 : [[File:RabbitMQ:消息顺序示例.png|400px]] RabbitMQ的消息顺序问题,需要分三个环节看待,'''发送消息的顺序'''、'''队列中消息的顺序'''、'''消费消息的顺序'''。 === 发送消息的顺序 === 消息发送端的顺序,大部分业务不做要求,谁先发消息无所谓,如果遇到业务一定要发送消息也确保顺序,那意味着,只能'''全局加锁'''一个个的操作,一个个的发消息,'''不能并发发送消息'''。 === 队列中消息的顺序 === RabbitMQ中,消息最终会保存在队列中,在同一个队列中,消息是顺序的,先进先出原则,这个由Rabbitmq保证,通常也不需要开发关心。 * 不同队列中的消息顺序,是没有保证的。 === 消费消息的顺序 === 我们说如何保证消息顺序性,通常说的就是消费者消费消息的顺序,在多个消费者消费同一个消息队列的场景,通常是无法保证消息顺序的,开篇的示意图已经说明,虽然消息队列的消息是顺序的,但是多个消费者并发消费消息,获取的消息的速度、执行业务逻辑的速度快慢、执行异常等等原因都会导致消息顺序不一致。 : 例如:消息A、B、C按顺序进入队列,消费者A1拿到消息A、消费者B1拿到消息B, 结果消费者B执行速度快,就跑完了,又或者消费者A1挂了,都会导致消息顺序不一致。 '''解决消费顺序的问题,通常就是一个队列只有一个消费者'''。 * 这样可以按顺序处理消息,但并发能力下降了,无法并发消费消息,这是个取舍问题。 * 如果业务又要顺序消费,又要增加并发,通常思路就是开启多个队列,业务根据规则将消息分发到不同的队列,通过增加队列的数量来提高并发度。 *: 例如:电商订单场景,只需要保证同一个用户的订单消息的顺序性就行,不同用户之间没有关系,所以只要让同一个用户的订单消息进入同一个队列就行,其他用户的订单消息,可以进入不同的队列。 == 优先级队列(消息优先级) == 优先级队列,顾名思义,优先级高的消息具备优先被消费的特权。 * '''只有当消费者不足,不能及时进行消费的情况下,优先级队列才会生效'''。 * RabbitMQ '''3.5''' 版本以后才支持优先级队列。 RabbitMQ优先级队列使用步骤: # '''设置队列最大优先级''':在声明队列的时候,通过队列属性('''x-max-priority''')设置队列的最大优先级,优先级的最大值为 255,官方建议最好在 1 到 10 之间。 #: <syntaxhighlight lang="Java" highlight=""> // Java Channel ch = ...; Map<String, Object> map = new HashMap<String, Object>(); map.put("x-max-priority", 10); ch.queueDeclare(PRIORITY_QUEUE, true, false, false, map); </syntaxhighlight> #: <syntaxhighlight lang="Java" highlight=""> // SpringBoot @Bean public Queue priorityQueue() { Map<String, Object> map = new HashMap<>(); //给当前队列配置最大优先级 map.put("x-max-priority", 10); return new Queue(PRIORITY_QUEUE, true, false, false, map); } </syntaxhighlight> #* 只能通过声明方式设定,不能通过策略方式修改; # '''设置消息优先级''':通过消息属性(Priority) #: <syntaxhighlight lang="Java" highlight=""> for(int i=0;i<10;i++) { AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); builder.priority(5); AMQP.BasicProperties properties = builder.build(); channel.basicPublish("priority_exchange", "rk_priority", properties, message.getBytes(StandardCharsets.UTF_8)); } </syntaxhighlight> #: <syntaxhighlight lang="Java" highlight=""> // SpringBoot amqpTemplate.convertAndSend("priority_exchange", "rk_priority", message, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setPriority(finalI); return message; } }); </syntaxhighlight> '''优先级队列必须和优先级消息一起使用''',才能发挥出效果,但是会消耗性能。 == 死信队列 == 在 RabbitMQ 中,当消息在一个队列中变成一个'''死信'''('''Dead Letter''',消费者无法处理的消息)之后,它将被重新投递到另一个交换机上,这个交换机我们就叫做'''死信交换机''',死信交换机将死信投递到一个队列上就是'''死信队列'''。 : [[File:RabbitMQ:死信队列.png|800px]] * 如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃。 === 产生条件 === # 消息被消费者手动拒绝(basic.reject / basic.nack),并且“requeue = false”(禁止重新入队); #: <syntaxhighlight lang="Java" highlight=""> // 拒绝消息,给死信队列 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); </syntaxhighlight> # 消息 TTL(消息存活时间)过期; #: <syntaxhighlight lang="Java" highlight=""> @Bean public Queue queue(){ Map<String,Object> map = new HashMap<>(); map.put("x-dead-letter-exchange",BEI_EXCHANGE_NAME); map.put("x-dead-letter-routing-key",BEI_ROUTING_KEY); map.put("x-message-ttl",7200); // 队列过期时间 Queue queue = new Queue(QUEUE_NAME,true,false,false,map); return queue; } </syntaxhighlight> # 队列达到最大长度; #: <syntaxhighlight lang="Java" highlight=""> @Bean public Queue queue(){ Map<String,Object> map = new HashMap<>(); map.put("x-dead-letter-exchange",BEI_EXCHANGE_NAME); map.put("x-dead-letter-routing-key",BEI_ROUTING_KEY); map.put("x-max-length",3); // 队列最大长度 // map.put("x-message-ttl",7200); // 队列过期时间 Queue queue = new Queue(QUEUE_NAME,true,false,false,map); return queue; } </syntaxhighlight> === 处理步骤 === # '''定义死信交换机'''; # '''定义死信队列'''; # '''定义死信消费者'''; # '''将死信交换机绑定到指定队列'''; *(死信交换机、死信队列、死信消费者,其实就是普通的交换机、队列、消费者) 示例: <syntaxhighlight lang="Java" highlight=""> import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class DlxProducer { public static void main(String[] args) throws Exception { // 设置连接以及创建 channel String exchangeName = "test_dlx_exchange"; String routingKey = "item.update"; String msg = "this is dlx msg"; // 设置消息过期时间,10秒后再消费 让消息进入死信队列(死信条件) AMQP.BasicProperties properties = new AMQP.BasicProperties().builder() .deliveryMode(2) .expiration("10000") .build(); channel.basicPublish(exchangeName, routingKey, true, properties, msg.getBytes()); System.out.println("Send message : " + msg); channel.close(); connection.close(); } } </syntaxhighlight> <syntaxhighlight lang="Java" highlight=""> import com.rabbitmq.client.*; import java.io.IOException; import java.util.HashMap; import java.util.Map; public class DlxConsumer { public static void main(String[] args) throws Exception { // 创建连接、创建 channel 忽略 内容可以在上面代码中获取 String exchangeName = "test_dlx_exchange"; String queueName = "test_dlx_queue"; String routingKey = "item.#"; // 声明交换机 channel.exchangeDeclare(exchangeName, "topic", true, false, null); // 声明队列 // 1、设置参数到 arguments 中 Map<String, Object> arguments = new HashMap<String, Object>(); arguments.put("x-dead-letter-exchange", "dlx.exchange"); // 设置 x-dead-letter-exchange 属性为:dlx.exchange(死信交换机名称) // 2、将 arguments 放入队列的声明中 channel.queueDeclare(queueName, true, false, false, arguments); // 绑定 channel.queueBind(queueName, exchangeName, routingKey); // 声明死信交换机 channel.exchangeDeclare("dlx.exchange", "topic", true, false, null); // 声明死信队列 channel.queueDeclare("dlx.queue", true, false, false, null); // 路由键为 # 代表可以路由到所有消息 channel.queueBind("dlx.queue", "dlx.exchange", "#"); // 消费者 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; // 设置 Channel 消费者绑定队列 channel.basicConsume(queueName, true, consumer); // 死信消费者 Consumer dlxConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received Dead Letter'" + message + "'"); } }; // 设置 Channel 消费者绑定队列 channel.basicConsume("dlx.queue", true, dlxConsumer); } } </syntaxhighlight> == 消息过期时间(TTL) == RabbitMQ的消息过期时间(TTL)下面两种方式进行设置: # 通过'''队列属性'''设置,队列中所有消息都有相同的过期时间。 #: <syntaxhighlight lang="Java" highlight=""> Map<String, Object> arguments = new HashMap<String, Object>(); arguments.put("x-dead-letter-exchange", "dlx.exchange"); // 设置 x-dead-letter-exchange 属性为:dlx.exchange(死信交换机名称) channel.queueDeclare(queueName, true, false, false, arguments); </syntaxhighlight> # 对每条'''消息单独设置'''过期时间,每条消息的TTL可以不同。 #: <syntaxhighlight lang="Java" highlight=""> AMQP.BasicProperties properties = new AMQP.BasicProperties().builder() .deliveryMode(2) .expiration("10000") .build(); channel.basicPublish(exchangeName, routingKey, true, properties, msg.getBytes()); </syntaxhighlight> * 如果两种方法一起使用,则消息的 TTL 以两者之间'''较小的那个数值为准'''。 == 延迟队列(延迟消息) == RabbitMQ 原生不支持延迟消息,目前主要通过: # 死信交换机 + 消息TTL 方案, # 或者 '''rabbitmq-delayed-message-exchange''' 插件 两种方式来实现。 应用场景: * 对消息生产和消费有时间窗口要求的场景。 *: 例如,在电商交易中超时未支付关闭订单的场景,在订单创建时会发送一条延时消息。这条消息将会在 30 分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。如支付未完成,则关闭订单。如已完成支付则忽略。 * 通过消息触发延时任务的场景。 *: 例如,在指定时间段之后向用户发送提醒消息。 === 死信交换机 + 消息TTL === 这个方案核心思想就是,创建一个没有消费者的队列,借助消息过期时间(TTL),当一条消息过期后会成为死信,这条死信消息会投递到死信交换机,死信交换机将消息发给死信队列,我们只要消费死信队列即可。 * 在这个方案中,消息过期时间就是消息延迟时间, *: 例如: 消息 TTL=30秒,因为这个队列没有消费者,消息 30 秒后过期,这条消息就变成死信,会被死信队列处理。 【结合上面两节即可】 === 延迟消息插件 === # '''安装插件''': #: 从 github 或官网插件页面, 选择下载插件对应版本,把文件(如:rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez)放到 rabbitmq 插件目录('''plugins'''目录); #* github 插件地址:[https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases] #* 官网插件页面地址:[https://www.rabbitmq.com/community-plugins.html https://www.rabbitmq.com/community-plugins.html] # '''激活插件''': #: (RabbitMQ安装目录的sbin目录下执行) #: <syntaxhighlight lang="bash" highlight=""> rabbitmq-plugins enable rabbitmq_delayed_message_exchange </syntaxhighlight> #: [[File:RabbitMQ:安装插件rabbitmq_delayed_message_exchange.png|800px]] #* 安装成功后,重启 RabbitMQ; # '''定义交换机''': #: 通过 '''x-delayed-type''' 设置自定义'''交换机属性''',支持发送延迟消息: #: <syntaxhighlight lang="Java" highlight="15"> /** * 订单延迟插件消息队列所绑定的交换机 */ @Bean CustomExchange orderPluginDirect() { /** 创建一个自定义交换机,可以发送延迟消息 * * name:QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getExchange() * type:"x-delayed-message" * durable:true * autoDelete:false * props:args */ Map<String, Object> args = new HashMap<>(); args.put("x-delayed-type", "direct"); return new CustomExchange(QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getExchange(), "x-delayed-message", true, false, args); } /** * 订单延迟插件队列 */ @Bean public Queue orderPluginQueue() { return new Queue(QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getName()); } /** * 将订单延迟插件队列绑定到交换机 */ @Bean public Binding orderPluginBinding(CustomExchange orderPluginDirect,Queue orderPluginQueue) { return BindingBuilder .bind(orderPluginQueue) .to(orderPluginDirect) .with(QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getRouteKey()) .noargs(); } </syntaxhighlight> # '''发送延迟消息''': #: 通过消息头('''x-delay'''),设置消息延迟时间: #: <syntaxhighlight lang="Java" highlight="11,25"> /** 给延迟队列发送消息(方式一) * exchange:QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getExchange() * routeKey:QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getRouteKey() * message:orderId * messagePostProcessor:new MessagePostProcessor() { . . . } */ amqpTemplate.convertAndSend(QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getExchange(), QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getRouteKey(), orderId, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { // 给消息设置延迟毫秒值 message.getMessageProperties().setHeader("x-delay",delayTimes); return message; } }); /** 给延迟队列发送消息(方式二) * exchange:RabbitConstants.Exchange.DELAY_EXCHANGE_NAME * routeKey:RabbitConstants.routingKey.DELAY_ROUTING_KEY * message:JSON.toJSONString(message) * messagePostProcessor: message1 -> { . . . } */ rabbitTemplate.convertAndSend(RabbitConstants.Exchange.DELAY_EXCHANGE_NAME, RabbitConstants.routingKey.DELAY_ROUTING_KEY, JSON.toJSONString(message), message1 -> { message1.getMessageProperties().setDelay(delay); return message1; }); </syntaxhighlight> * 提示:如果你直接使用阿里云的 RabbitMQ 消息云服务,通过消息头属性('''delay'''),设置延迟时间即可,不用安装插件,阿里云已经对 RabbitMQ 扩展了。
返回至“
RabbitMQ:高级特性
”。
导航菜单
个人工具
登录
命名空间
页面
讨论
大陆简体
已展开
已折叠
查看
阅读
查看源代码
查看历史
更多
已展开
已折叠
搜索
导航
首页
最近更改
随机页面
MediaWiki帮助
笔记
服务器
数据库
后端
前端
工具
《To do list》
日常
阅读
电影
摄影
其他
Software
Windows
WIKIOE
所有分类
所有页面
侧边栏
站点日志
工具
链入页面
相关更改
特殊页面
页面信息