查看“RabbitMQ:高级特性”的源代码
←
RabbitMQ:高级特性
跳到导航
跳到搜索
因为以下原因,您没有权限编辑本页:
您请求的操作仅限属于该用户组的用户执行:
用户
您可以查看和复制此页面的源代码。
[[category:RabbitMQ]] == 持久化机制 == RabbitMQ 持久化机制分为: # '''交换器持久化''':声明交换机的时候可以通过属性设置是否需要持久化; #: <syntaxhighlight lang="Java" highlight=""> // 声明交换机:第三个参数为持久化选项 channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT, true); </syntaxhighlight> # '''队列持久化''':声明队列的时候可以设置队列是否需要持久化; #: <syntaxhighlight lang="Java" highlight=""> // 声明队列:第二个参数为持久化选项 channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null); </syntaxhighlight> # '''消息持久化''':在发布消息的时候,通过消息属性可以设置,消息是否需要持久化; #: <syntaxhighlight lang="Java" highlight=""> // 发送消息:第三个参数为消息属性 //“MessageProperties.PERSISTENT_TEXT_PLAIN”:将消息标记为持久性,确保即使 RabbitMQ 重新启动,队列也不会丢失。 channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8)); </syntaxhighlight> * 队列持久化,不代表消息就是持久化的。 * 不管是持久化的消息还是非持久化的消息都可以被写入到磁盘: ** 持久化消息会同时写入磁盘和内存(加快读取速度); ** 非持久化消息会在内存不够用时,将消息写入磁盘(Rabbitmq重启之后就没有了)。 == 消息顺序 == RabbitMQ使用过程中,有些业务场景需要我们保证顺序消费,例如:业务上产生三条消息,分别是对数据的增加、修改、删除操作,如果没有保证顺序消费,执行顺序可能变成删除、修改、增加,这就乱了。 : [[File:RabbitMQ:消息顺序示例.png|400px]] RabbitMQ的消息顺序问题,需要分三个环节看待,'''发送消息的顺序'''、'''队列中消息的顺序'''、'''消费消息的顺序'''。 === 发送消息的顺序 === 消息发送端的顺序,大部分业务不做要求,谁先发消息无所谓,如果遇到业务一定要发送消息也确保顺序,那意味着,只能'''全局加锁'''一个个的操作,一个个的发消息,'''不能并发发送消息'''。 === 队列中消息的顺序 === RabbitMQ中,消息最终会保存在队列中,在同一个队列中,消息是顺序的,先进先出原则,这个由Rabbitmq保证,通常也不需要开发关心。 * 不同队列中的消息顺序,是没有保证的。 === 消费消息的顺序 === 我们说如何保证消息顺序性,通常说的就是消费者消费消息的顺序,在多个消费者消费同一个消息队列的场景,通常是无法保证消息顺序的,开篇的示意图已经说明,虽然消息队列的消息是顺序的,但是多个消费者并发消费消息,获取的消息的速度、执行业务逻辑的速度快慢、执行异常等等原因都会导致消息顺序不一致。 : 例如:消息A、B、C按顺序进入队列,消费者A1拿到消息A、消费者B1拿到消息B, 结果消费者B执行速度快,就跑完了,又或者消费者A1挂了,都会导致消息顺序不一致。 '''解决消费顺序的问题,通常就是一个队列只有一个消费者'''。 * 这样可以按顺序处理消息,但并发能力下降了,无法并发消费消息,这是个取舍问题。 * 如果业务又要顺序消费,又要增加并发,通常思路就是开启多个队列,业务根据规则将消息分发到不同的队列,通过增加队列的数量来提高并发度。 *: 例如:电商订单场景,只需要保证同一个用户的订单消息的顺序性就行,不同用户之间没有关系,所以只要让同一个用户的订单消息进入同一个队列就行,其他用户的订单消息,可以进入不同的队列。 == 优先级队列(消息优先级) == 优先级队列,顾名思义,优先级高的消息具备优先被消费的特权。 * '''只有当消费者不足,不能及时进行消费的情况下,优先级队列才会生效'''。 * RabbitMQ '''3.5''' 版本以后才支持优先级队列。 RabbitMQ优先级队列使用步骤: # '''设置队列最大优先级''':在声明队列的时候,通过队列属性('''x-max-priority''')设置队列的最大优先级,优先级的最大值为 255,官方建议最好在 1 到 10 之间。 #: <syntaxhighlight lang="Java" highlight=""> // Java Channel ch = ...; Map<String, Object> map = new HashMap<String, Object>(); map.put("x-max-priority", 10); ch.queueDeclare(PRIORITY_QUEUE, true, false, false, map); </syntaxhighlight> #: <syntaxhighlight lang="Java" highlight=""> // SpringBoot @Bean public Queue priorityQueue() { Map<String, Object> map = new HashMap<>(); //给当前队列配置最大优先级 map.put("x-max-priority", 10); return new Queue(PRIORITY_QUEUE, true, false, false, map); } </syntaxhighlight> #* 只能通过声明方式设定,不能通过策略方式修改; # '''设置消息优先级''':通过消息属性(Priority) #: <syntaxhighlight lang="Java" highlight=""> for(int i=0;i<10;i++) { AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); builder.priority(5); AMQP.BasicProperties properties = builder.build(); channel.basicPublish("priority_exchange", "rk_priority", properties, message.getBytes(StandardCharsets.UTF_8)); } </syntaxhighlight> #: <syntaxhighlight lang="Java" highlight=""> // SpringBoot amqpTemplate.convertAndSend("priority_exchange", "rk_priority", message, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setPriority(finalI); return message; } }); </syntaxhighlight> '''优先级队列必须和优先级消息一起使用''',才能发挥出效果,但是会消耗性能。 == 死信队列 == 在 RabbitMQ 中,当消息在一个队列中变成一个'''死信'''('''Dead Letter''',消费者无法处理的消息)之后,它将被重新投递到另一个交换机上,这个交换机我们就叫做'''死信交换机''',死信交换机将死信投递到一个队列上就是'''死信队列'''。 : [[File:RabbitMQ:死信队列.png|800px]] * 如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃。 === 产生条件 === # 消息被消费者手动拒绝(basic.reject / basic.nack),并且“requeue = false”(禁止重新入队); #: <syntaxhighlight lang="Java" highlight=""> // 拒绝消息,给死信队列 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); </syntaxhighlight> # 消息 TTL(消息存活时间)过期; #: <syntaxhighlight lang="Java" highlight=""> @Bean public Queue queue(){ Map<String,Object> map = new HashMap<>(); map.put("x-dead-letter-exchange",BEI_EXCHANGE_NAME); map.put("x-dead-letter-routing-key",BEI_ROUTING_KEY); map.put("x-message-ttl",7200); // 队列过期时间 Queue queue = new Queue(QUEUE_NAME,true,false,false,map); return queue; } </syntaxhighlight> # 队列达到最大长度; #: <syntaxhighlight lang="Java" highlight=""> @Bean public Queue queue(){ Map<String,Object> map = new HashMap<>(); map.put("x-dead-letter-exchange",BEI_EXCHANGE_NAME); map.put("x-dead-letter-routing-key",BEI_ROUTING_KEY); map.put("x-max-length",3); // 队列最大长度 // map.put("x-message-ttl",7200); // 队列过期时间 Queue queue = new Queue(QUEUE_NAME,true,false,false,map); return queue; } </syntaxhighlight> === 死信处理步骤 === # '''定义死信交换机'''; # '''定义死信队列'''; # '''定义死信消费者'''; # '''将死信交换机绑定到指定队列'''; *(死信交换机、死信队列、死信消费者,其实就是普通的交换机、队列、消费者) 示例: <syntaxhighlight lang="Java" highlight=""> import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class DlxProducer { public static void main(String[] args) throws Exception { // 设置连接以及创建 channel String exchangeName = "test_dlx_exchange"; String routingKey = "item.update"; String msg = "this is dlx msg"; // 设置消息过期时间,10秒后再消费 让消息进入死信队列(死信条件) AMQP.BasicProperties properties = new AMQP.BasicProperties().builder() .deliveryMode(2) .expiration("10000") .build(); channel.basicPublish(exchangeName, routingKey, true, properties, msg.getBytes()); System.out.println("Send message : " + msg); channel.close(); connection.close(); } } </syntaxhighlight> <syntaxhighlight lang="Java" highlight=""> import com.rabbitmq.client.*; import java.io.IOException; import java.util.HashMap; import java.util.Map; public class DlxConsumer { public static void main(String[] args) throws Exception { // 创建连接、创建 channel 忽略 内容可以在上面代码中获取 String exchangeName = "test_dlx_exchange"; String queueName = "test_dlx_queue"; String routingKey = "item.#"; // 声明交换机 channel.exchangeDeclare(exchangeName, "topic", true, false, null); // 声明队列 // 1、设置参数到 arguments 中 Map<String, Object> arguments = new HashMap<String, Object>(); arguments.put("x-dead-letter-exchange", "dlx.exchange"); // 设置 x-dead-letter-exchange 属性为:dlx.exchange(死信交换机名称) // 2、将 arguments 放入队列的声明中 channel.queueDeclare(queueName, true, false, false, arguments); // 绑定 channel.queueBind(queueName, exchangeName, routingKey); // 声明死信交换机 channel.exchangeDeclare("dlx.exchange", "topic", true, false, null); // 声明死信队列 channel.queueDeclare("dlx.queue", true, false, false, null); // 路由键为 # 代表可以路由到所有消息 channel.queueBind("dlx.queue", "dlx.exchange", "#"); // 消费者 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received '" + message + "'"); } }; // 设置 Channel 消费者绑定队列 channel.basicConsume(queueName, true, consumer); // 死信消费者 Consumer dlxConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received Dead Letter'" + message + "'"); } }; // 设置 Channel 消费者绑定队列 channel.basicConsume("dlx.queue", true, dlxConsumer); } } </syntaxhighlight> == 消息过期时间(TTL) == <syntaxhighlight lang="Java" highlight=""> </syntaxhighlight> <syntaxhighlight lang="Java" highlight=""> </syntaxhighlight> <syntaxhighlight lang="Java" highlight=""> </syntaxhighlight> <syntaxhighlight lang="Java" highlight=""> </syntaxhighlight>
返回至“
RabbitMQ:高级特性
”。
导航菜单
个人工具
登录
命名空间
页面
讨论
大陆简体
已展开
已折叠
查看
阅读
查看源代码
查看历史
更多
已展开
已折叠
搜索
导航
首页
最近更改
随机页面
MediaWiki帮助
笔记
服务器
数据库
后端
前端
工具
《To do list》
日常
阅读
电影
摄影
其他
Software
Windows
WIKIOE
所有分类
所有页面
侧边栏
站点日志
工具
链入页面
相关更改
特殊页面
页面信息