查看“RabbitMQ:SpringBoot(延迟消息、死信队列)”的源代码
←
RabbitMQ:SpringBoot(延迟消息、死信队列)
跳到导航
跳到搜索
因为以下原因,您没有权限编辑本页:
您请求的操作仅限属于该用户组的用户执行:
用户
您可以查看和复制此页面的源代码。
[[category:RabbitMQ]] [[category:SpringBoot]] __TOC__ == 延迟消息实现(死信队列 + 消息TTL) == 参考:'''[https://zhuanlan.zhihu.com/p/147670269 SpringBoot整合RabbitMQ实现延迟消息]''' 场景: # 用于解决用户下单以后,订单超时如何取消订单的问题: ## - 用户进行提交订单操作(会有锁定商品库存等操作); ## - 生成订单,获取订单的id; ## - 获取到设置的订单超时时间(假设设置的为60分钟不支付取消订单); ## - 按订单超时时间发送一个延迟消息给 RabbitMQ,让它在订单超时后触发取消订单的操作; ## - 如果用户没有支付,进行取消订单操作(释放锁定商品库存一系列操作)。 #: 实现方法:需要一个订单延迟消息队列,以及一个取消订单消息队列:一旦有消息以延迟订单设置的路由键发送过来,会转发到订单延迟消息队列,并在此队列保存一定时间,等到超时后会自动将消息发送到取消订单消息消费队列。 # 短信验证码以及邮箱验证码都采用消息队列进行消费: #: 采用队列,交换机,路由键进行消费。一条队列,一个交换机,一个路由键就可以实现。 === 实现 === # '''pom.xml''': #: <syntaxhighlight lang="xml" highlight=""> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </syntaxhighlight> # '''application.yml''': #: <syntaxhighlight lang="yaml" highlight=""> # SpringBoot配置RabbitMq rabbitmq: host: localhost # rabbitmq的连接地址 port: 5672 # rabbitmq的连接端口号 virtual-host: /hanzoMall # rabbitmq的虚拟host username: hanzoMall # rabbitmq的用户名 password: hanzoMall # rabbitmq的密码 publisher-confirms: true #如果对异步消息需要回调必须设置为true </syntaxhighlight> # '''消息队列枚举配置''': #: <syntaxhighlight lang="Java" highlight=""> package ltd.hanzo.mall.common; import com.rabbitmq.client.AMQP; import lombok.Getter; @Getter public enum QueueEnum { /** * 短信消息通知队列 * exchange:mall.sms.direct * queue:mall.sms.send * routeKey:mall.sms.send */ QUEUE_SMS_SEND("mall.sms.direct", "mall.sms.send", "mall.sms.send"), /** * 邮件消息通知队列 * exchange:mall.email.direct * queue:mall.email.send * routeKey:mall.email.send */ QUEUE_EMAIL_SEND("mall.email.direct", "mall.email.send", "mall.email.send"), /** * “订单取消”消息通知队列 * exchange:mall.order.direct * queue:mall.order.cancel * routeKey:mall.order.cancel */ QUEUE_ORDER_CANCEL("mall.order.direct", "mall.order.cancel", "mall.order.cancel"), /** * “订单延迟”消息通知队列 * exchange:mall.order.direct.ttl * queue:mall.order.cancel.ttl * routeKey:mall.order.cancel.ttl * 订单消息会被转发到此队列,并在此队列保存一定时间,等到超时后会自动将消息发送到 mall.order.cancel(取消订单消息消费队列)。 */ QUEUE_TTL_ORDER_CANCEL("mall.order.direct.ttl", "mall.order.cancel.ttl", "mall.order.cancel.ttl"); /** * 交换机名称 */ private String exchange; /** * 队列名称 */ private String name; /** * 路由键 */ private String routeKey; QueueEnum(String exchange, String name, String routeKey) { this.exchange = exchange; this.name = name; this.routeKey = routeKey; } } </syntaxhighlight> # '''RabbitMQ 配置类''': #: <syntaxhighlight lang="Java" highlight=""> package ltd.hanzo.mall.config; import ltd.hanzo.mall.common.QueueEnum; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; */ @Configuration public class RabbitMqConfig { /* --------------------------------------------------------1、短信消息队列------------------------------------------------------- */ /** * 交换机 */ @Bean DirectExchange sendSmsDirect() { return (DirectExchange) ExchangeBuilder .directExchange(QueueEnum.QUEUE_SMS_SEND.getExchange()) .durable(true) .build(); } /** * 队列 */ @Bean public Queue sendSmsQueue() { return new Queue(QueueEnum.QUEUE_SMS_SEND.getName()); } /** * 绑定 */ @Bean Binding sendSmsBinding(DirectExchange sendSmsDirect, Queue sendSmsQueue){ return BindingBuilder .bind(sendSmsQueue) .to(sendSmsDirect) .with(QueueEnum.QUEUE_SMS_SEND.getRouteKey()); } /* --------------------------------------------------------2、邮件消息队列------------------------------------------------------- */ /** * 交换机 */ @Bean DirectExchange sendEmailDirect() { return (DirectExchange) ExchangeBuilder .directExchange(QueueEnum.QUEUE_EMAIL_SEND.getExchange()) .durable(true) .build(); } /** * 队列 */ @Bean public Queue sendEmailQueue() { return new Queue(QueueEnum.QUEUE_EMAIL_SEND.getName()); } /** * 绑定 */ @Bean Binding sendEmailBinding(DirectExchange sendEmailDirect, Queue sendEmailQueue){ return BindingBuilder .bind(sendEmailQueue) .to(sendEmailDirect) .with(QueueEnum.QUEUE_EMAIL_SEND.getRouteKey()); } /* --------------------------------------------------------3、订单取消队列------------------------------------------------------- */ /** * 交换机 */ @Bean DirectExchange orderDirect() { return (DirectExchange) ExchangeBuilder .directExchange(QueueEnum.QUEUE_ORDER_CANCEL.getExchange()) .durable(true) .build(); } /** * 队列 */ @Bean public Queue orderQueue() { return new Queue(QueueEnum.QUEUE_ORDER_CANCEL.getName()); } /** * 绑定 */ @Bean Binding orderBinding(DirectExchange orderDirect,Queue orderQueue){ return BindingBuilder .bind(orderQueue) .to(orderDirect) .with(QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey()); } /* --------------------------------------------------------4、订单延迟队列------------------------------------------------------- */ /** * 交换机 */ @Bean DirectExchange orderTtlDirect() { return (DirectExchange) ExchangeBuilder .directExchange(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange()) .durable(true) .build(); } /** * 队列(死信队列) */ @Bean public Queue orderTtlQueue() { return QueueBuilder .durable(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getName()) .withArgument("x-dead-letter-exchange", QueueEnum.QUEUE_ORDER_CANCEL.getExchange()) // 到期后转发的交换机 .withArgument("x-dead-letter-routing-key", QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey()) // 到期后转发的路由键 .build(); } /** * 绑定 */ @Bean Binding orderTtlBinding(DirectExchange orderTtlDirect,Queue orderTtlQueue){ return BindingBuilder .bind(orderTtlQueue) .to(orderTtlDirect) .with(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey()); } } </syntaxhighlight> # '''“订单延迟”发送者''':向订单延迟消息队列(mall.order.cancel.ttl)里发送消息 #: <syntaxhighlight lang="Java" highlight=""> package ltd.hanzo.mall.component; import lombok.extern.slf4j.Slf4j; import ltd.hanzo.mall.common.QueueEnum; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component @Slf4j public class CancelOrderSender { @Autowired private AmqpTemplate amqpTemplate; public void sendMessage(String orderNo,final long delayTimes){ // 给延迟队列发送消息 amqpTemplate.convertAndSend(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange(), QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey(), orderNo, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { // 给消息设置延迟毫秒值 message.getMessageProperties().setExpiration(String.valueOf(delayTimes)); return message; } }); log.info("send delay message orderNo:{}",orderNo); } } </syntaxhighlight> #* “订单延迟”队列(mall.order.cancel.ttl)中消息过期之后就会被转发到达“订单取消”队列(mall.order.cancel); # '''“订单取消”接收者''':用于从取消订单的消息队列(mall.order.cancel)里接收消息 #: <syntaxhighlight lang="Java" highlight=""> package ltd.hanzo.mall.component; import lombok.extern.slf4j.Slf4j; import ltd.hanzo.mall.service.HanZoMallOrderService; import ltd.hanzo.mall.service.TaskService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component @RabbitListener(queues = "mall.order.cancel") @Slf4j public class CancelOrderReceiver { @Autowired private HanZoMallOrderService hanZoMallOrderService; @Autowired private TaskService taskService; @RabbitHandler public void handle(String orderNo){ log.info("receive delay message orderNo:{}",orderNo); hanZoMallOrderService.cancelOrder(orderNo); taskService.cancelOrderSendSimpleMail(orderNo); } } </syntaxhighlight> # '''HanZoMallOrderService接口''':创建订单,取消超时订单 #: <syntaxhighlight lang="Java" highlight=""> public interface HanZoMallOrderService { /** * 保存订单 * * @param user * @param myShoppingCartItems * @return */ String saveOrder(HanZoMallUserVO user, List<HanZoMallShoppingCartItemVO> myShoppingCartItems); /** * 取消单个超时订单 */ @Transactional void cancelOrder(String orderNo); } </syntaxhighlight> # '''HanZoMallOrderServiceImpl实现类''':实现 HanZoMallOrderService 接口 #: <syntaxhighlight lang="Java" highlight=""> @Slf4j @Service public class HanZoMallOrderServiceImpl implements HanZoMallOrderService { @Resource private HanZoMallOrderMapper hanZoMallOrderMapper; @Resource private HanZoMallOrderItemMapper hanZoMallOrderItemMapper; @Resource private HanZoMallShoppingCartItemMapper hanZoMallShoppingCartItemMapper; @Resource private HanZoMallGoodsMapper hanZoMallGoodsMapper; @Autowired private CancelOrderSender cancelOrderSender; @Override @Transactional public String saveOrder(HanZoMallUserVO user, List<HanZoMallShoppingCartItemVO> myShoppingCartItems) { // todo 执行一系类下单操作,代码在github中 // 下单完成后开启一个延迟消息,用于当用户没有付款时取消订单 sendDelayMessageCancelOrder(orderNo); // 所有操作成功后,将订单号返回,以供Controller方法跳转到订单详情 return orderNo; } @Override public void cancelOrder(String orderNo) { HanZoMallOrder hanZoMallOrder = hanZoMallOrderMapper.selectByOrderNo(orderNo); if (hanZoMallOrder != null && hanZoMallOrder.getOrderStatus() == 0) { // 超时取消订单 hanZoMallOrderMapper.closeOrder(Collections.singletonList(hanZoMallOrder.getOrderId()), HanZoMallOrderStatusEnum.ORDER_CLOSED_BY_EXPIRED.getOrderStatus()); } } private void sendDelayMessageCancelOrder(String orderNo) { // 获取订单超时时间,假设为60分钟 long delayTimes = 36 * 100000; // 发送延迟消息 cancelOrderSender.sendMessage(orderNo, delayTimes); } } </syntaxhighlight> == 死信队列实现 == 死信队列可以实现消息在未被正常消费的场景下,对这些消息进行其他处理,保证消息不会被丢弃。 死信场景: # 消息被消费者拒绝签收,并且重新入队为false:(basic.reject() / basic.nack())and requeue = false。 #* 注意:消费者设置了自动 ACK,当重复投递次数达到了设置的最大 retry 次数之后,消息也会投递到死信队列,但是内部的原理还是调用了 nack/reject。 # 消息过期,过了 '''TTL''' 存活时间。 # 队列设置了 '''x-max-length''' 最大消息数量且当前队列中的消息已经达到了这个数量,再次投递,消息将被挤掉,被挤掉的是最靠近被消费那一端的消息。 === 实现 === * 参考:'''[https://zhuanlan.zhihu.com/p/132446860 RabbitMQ死信队列在SpringBoot中的使用]''' : [[File:RabbitMQ:死信队列示例.png|800px]] # 正常业务消息被投递到正常业务的Exchange,该Exchange根据路由键将消息路由到绑定的正常队列。 # 正常业务队列中的消息变成了死信消息之后,会被自动投递到该队列绑定的死信交换机上(并带上配置的路由键,'''如果没有指定死信消息的路由键,则默认继承该消息在正常业务时设定的路由键''')。 # 死信交换机收到消息后,将消息根据路由规则路由到指定的死信队列。 # 消息到达死信队列后,可监听该死信队列,处理死信消息。 # '''application.yml''': #: <syntaxhighlight lang="yaml" highlight=""> spring: application: name: learn-rabbitmq rabbitmq: host: localhost port: 5672 username: futao password: 123456789 virtual-host: deadletter-vh connection-timeout: 15000 # 发送确认 publisher-confirms: true # 路由失败回调 publisher-returns: true template: # 必须设置成true 消息路由失败通知监听者,而不是将消息丢弃 mandatory: true listener: simple: # 每次从RabbitMQ获取的消息数量 prefetch: 1 default-requeue-rejected: false # 每个队列启动的消费者数量 concurrency: 1 # 每个队列最大的消费者数量 max-concurrency: 1 # 签收模式为手动签收-那么需要在代码中手动ACK acknowledge-mode: manual app: rabbitmq: # 队列定义 queue: # 正常业务队列 user: user-queue # 死信队列 user-dead-letter: user-dead-letter-queue # 交换机定义 exchange: # 正常业务交换机 user: user-exchange # 死信交换机 common-dead-letter: common-dead-letter-exchange </syntaxhighlight> # '''RabbitMQ 配置类''': #: <syntaxhighlight lang="Java" highlight=""> @Configuration public class Declare { /* --------------------------------------------------------1、用户消息队列------------------------------------------------------- */ /** * 用户交换机 * * @param userExchangeName 用户交换机名 * @return */ @Bean public Exchange userExchange(@Value("${app.rabbitmq.exchange.user}") String userExchangeName) { return ExchangeBuilder .topicExchange(userExchangeName) .durable(true) .build(); } /** * 用户队列 * * @param userQueueName 用户队列名 * @return */ @Bean public Queue userQueue(@Value("${app.rabbitmq.queue.user}") String userQueueName, @Value("${app.rabbitmq.exchange.common-dead-letter}") String commonDeadLetterExchange) { return QueueBuilder .durable(userQueueName) //声明该队列的死信消息发送到的 交换机 (队列添加了这个参数之后会自动与该交换机绑定,并设置路由键,不需要开发者手动设置) .withArgument("x-dead-letter-exchange", commonDeadLetterExchange) //声明该队列死信消息在交换机的 路由键 .withArgument("x-dead-letter-routing-key", "user-dead-letter-routing-key") .build(); } /** * 用户队列与交换机绑定 * * @param userQueue 用户队列名 * @param userExchange 用户交换机名 * @return */ @Bean public Binding userBinding(Queue userQueue, Exchange userExchange) { return BindingBuilder .bind(userQueue) .to(userExchange) .with("user.*") .noargs(); } /* --------------------------------------------------------2、死信消息队列------------------------------------------------------- */ /** * 死信交换机 * * @param commonDeadLetterExchange 通用死信交换机名 * @return */ @Bean public Exchange commonDeadLetterExchange(@Value("${app.rabbitmq.exchange.common-dead-letter}") String commonDeadLetterExchange) { return ExchangeBuilder .topicExchange(commonDeadLetterExchange) .durable(true) .build(); } /** * 死信交换机 * * 用这个队列来接收 user-queue 的死信消息 * 用户队列 user-queue 的死信投递到死信交换机`common-dead-letter-exchange`后再投递到该队列 * @return */ @Bean public Queue userDeadLetterQueue(@Value("${app.rabbitmq.queue.user-dead-letter}") String userDeadLetterQueue) { return QueueBuilder .durable(userDeadLetterQueue) .build(); } /** * 死信队列绑定死信交换机 * * @param userDeadLetterQueue user-queue对应的死信队列 * @param commonDeadLetterExchange 通用死信交换机 * @return */ @Bean public Binding userDeadLetterBinding(Queue userDeadLetterQueue, Exchange commonDeadLetterExchange) { return BindingBuilder .bind(userDeadLetterQueue) .to(commonDeadLetterExchange) .with("user-dead-letter-routing-key") .noargs(); } } </syntaxhighlight> # '''生产者''': #: <syntaxhighlight lang="Java" highlight=""> @Component public class DeadLetterSender { @Autowired private RabbitTemplate rabbitTemplate; @Value("${app.rabbitmq.exchange.user}") private String userExchange; public void send() { User user = User.builder() .userName("天文") .address("浙江杭州") .birthday(LocalDate.now(ZoneOffset.ofHours(8))) .build(); rabbitTemplate.convertAndSend(userExchange, "user.abc", user); } } </syntaxhighlight> ==== 场景一:(basic.reject() / basic.nack())and requeue = false ==== 消息被(basic.reject() or basic.nack()) and requeue = false,即消息被消费者拒绝或者nack,并且重新入队为false。 * nack()支持批量确认,而reject()不支持。 # '''消费者''': #: <syntaxhighlight lang="Java" highlight=""> @Slf4j @Component public class Consumer { /** * 正常用户队列消息监听消费者 * * @param user * @param message * @param channel */ @RabbitListener(queues = "${app.rabbitmq.queue.user}") public void userConsumer(User user, Message message, Channel channel) { log.info("正常用户业务监听:接收到消息:[{}]", JSON.toJSONString(user)); try { //参数为:消息的DeliveryTag,是否批量拒绝,是否重新入队 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); log.info("拒绝签收...消息的路由键为:[{}]", message.getMessageProperties().getReceivedRoutingKey()); } catch (IOException e) { log.error("消息拒绝签收失败", e); } } /** * @param user * @param message * @param channel */ @RabbitListener(queues = "${app.rabbitmq.queue.user-dead-letter}") public void userDeadLetterConsumer(User user, Message message, Channel channel) { log.info("接收到死信消息:[{}]", JSON.toJSONString(user)); try { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); log.info("死信队列签收消息....消息路由键为:[{}]", message.getMessageProperties().getReceivedRoutingKey()); } catch (IOException e) { log.error("死信队列消息签收失败", e); } } } </syntaxhighlight> ===== (autoACK and 重复投递次数>retry) ===== # '''application.yml''': #: <syntaxhighlight lang="yaml" highlight=""> spring: application: name: learn-rabbitmq rabbitmq: . . . listener: simple: # 每次从RabbitMQ获取的消息数量 prefetch: 1 default-requeue-rejected: false # 每个队列启动的消费者数量 concurrency: 1 # 每个队列最大的消费者数量 max-concurrency: 1 # 自动签收 acknowledge-mode: auto retry: enabled: true # 第一次尝试时间间隔 initial-interval: 10S # 两次尝试之间的最长持续时间。 max-interval: 10S # 最大重试次数(=第一次正常投递1+重试次数4) max-attempts: 5 # 上一次重试时间的乘数 multiplier: 1.0 . . . </syntaxhighlight> # '''消费者''': #: <syntaxhighlight lang="Java" highlight=""> @Slf4j @Configuration public class AutoAckConsumer { /** * 正常用户队列消息监听消费者 * * @param user */ @RabbitListener(queues = "${app.rabbitmq.queue.user}") public void userConsumer(User user) { log.info("正常用户业务监听:接收到消息:[{}]", JSON.toJSONString(user)); throw new RuntimeException("模拟发生异常"); } /** * @param user */ @RabbitListener(queues = "${app.rabbitmq.queue.user-dead-letter}") public void userDeadLetterConsumer(User user) { log.info("接收到死信消息并自动签收:[{}]", JSON.toJSONString(user)); } } </syntaxhighlight> ==== 场景二:消息过期 ==== 消息过期,过了TTL存活时间。 # '''RabbitMQ 配置类''':设置队列消息的过期时间 '''x-message-ttl'''; #: <syntaxhighlight lang="Java" highlight=""> . . . /** * 用户队列 * * @param userQueueName 用户队列名 * @return */ @Bean public Queue userQueue(@Value("${app.rabbitmq.queue.user}") String userQueueName, @Value("${app.rabbitmq.exchange.common-dead-letter}") String commonDeadLetterExchange) { return QueueBuilder .durable(userQueueName) // 声明该队列的死信消息发送到的 交换机 (队列添加了这个参数之后会自动与该交换机绑定,并设置路由键,不需要开发者手动设置) .withArgument("x-dead-letter-exchange", commonDeadLetterExchange) // 声明该队列死信消息在交换机的 路由键 .withArgument("x-dead-letter-routing-key", "user-dead-letter-routing-key") // 该队列的消息的过期时间-超过这个时间还未被消费则路由到死信队列 .withArgument("x-message-ttl", 5000) .build(); } . . . </syntaxhighlight> # '''生产者''':为每条消息设定过期时间 #: <syntaxhighlight lang="Java" highlight=""> . . . log.info("消息投递...指定的存活时长为:[{}]ms", exp); rabbitTemplate.convertAndSend(userExchange, "user.abc", user, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { MessageProperties messageProperties = message.getMessageProperties(); //为每条消息设定过期时间 messageProperties.setExpiration(exp); return message; } }); . . . </syntaxhighlight> # '''消费者''':user-queue的消费者注释,使消息无法被消费 #: <syntaxhighlight lang="Java" highlight=""> @Slf4j @Component public class Consumer { /** * 正常用户队列消息监听消费者 * * @param user * @param message * @param channel @RabbitListener(queues = "${app.rabbitmq.queue.user}") public void userConsumer(User user, Message message, Channel channel) { log.info("正常用户业务监听:接收到消息:[{}]", JSON.toJSONString(user)); try { //参数为:消息的DeliveryTag,是否批量拒绝,是否重新入队 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); log.info("拒绝签收...消息的路由键为:[{}]", message.getMessageProperties().getReceivedRoutingKey()); } catch (IOException e) { log.error("消息拒绝签收失败", e); } } */ /** * @param user * @param message * @param channel */ @RabbitListener(queues = "${app.rabbitmq.queue.user-dead-letter}") public void userDeadLetterConsumer(User user, Message message, Channel channel) { log.info("接收到死信消息:[{}]", JSON.toJSONString(user)); try { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); log.info("死信队列签收消息....消息路由键为:[{}]", message.getMessageProperties().getReceivedRoutingKey()); } catch (IOException e) { log.error("死信队列消息签收失败", e); } } } </syntaxhighlight> ==== 场景三:队列达到最大消息数量(x-max-length) ==== # '''RabbitMQ 配置类''':为队列设置最大消息数量'''x-max-length''' #: <syntaxhighlight lang="Java" highlight=""> . . . /** * 用户队列 * * @param userQueueName 用户队列名 * @return */ @Bean public Queue userQueue(@Value("${app.rabbitmq.queue.user}") String userQueueName, @Value("${app.rabbitmq.exchange.common-dead-letter}") String commonDeadLetterExchange) { return QueueBuilder .durable(userQueueName) //声明该队列的死信消息发送到的 交换机 (队列添加了这个参数之后会自动与该交换机绑定,并设置路由键,不需要开发者手动设置) .withArgument("x-dead-letter-exchange", commonDeadLetterExchange) //声明该队列死信消息在交换机的 路由键 .withArgument("x-dead-letter-routing-key", "user-dead-letter-routing-key") //队列最大消息数量 .withArgument("x-max-length", 2) .build(); } . . . </syntaxhighlight> # '''生产者''':向队列中投递多条消息; #: 当投递第 3 条消息的时候,RabbitMQ 会把在最靠近被消费那一端的消息移出队列,并投递到死信队列。
返回至“
RabbitMQ:SpringBoot(延迟消息、死信队列)
”。
导航菜单
个人工具
登录
命名空间
页面
讨论
大陆简体
已展开
已折叠
查看
阅读
查看源代码
查看历史
更多
已展开
已折叠
搜索
导航
首页
最近更改
随机页面
MediaWiki帮助
笔记
服务器
数据库
后端
前端
工具
《To do list》
日常
阅读
电影
摄影
其他
Software
Windows
WIKIOE
所有分类
所有页面
侧边栏
站点日志
工具
链入页面
相关更改
特殊页面
页面信息