RabbitMQ:SpringBoot(延迟消息、死信队列)
跳到导航
跳到搜索
延迟消息实现(死信队列 + 消息TTL)
参考:SpringBoot整合RabbitMQ实现延迟消息
场景:
- 用于解决用户下单以后,订单超时如何取消订单的问题:
- - 用户进行提交订单操作(会有锁定商品库存等操作);
- - 生成订单,获取订单的id;
- - 获取到设置的订单超时时间(假设设置的为60分钟不支付取消订单);
- - 按订单超时时间发送一个延迟消息给 RabbitMQ,让它在订单超时后触发取消订单的操作;
- - 如果用户没有支付,进行取消订单操作(释放锁定商品库存一系列操作)。
- 实现方法:需要一个订单延迟消息队列,以及一个取消订单消息队列:一旦有消息以延迟订单设置的路由键发送过来,会转发到订单延迟消息队列,并在此队列保存一定时间,等到超时后会自动将消息发送到取消订单消息消费队列。
- 短信验证码以及邮箱验证码都采用消息队列进行消费:
- 采用队列,交换机,路由键进行消费。一条队列,一个交换机,一个路由键就可以实现。
实现
- pom.xml:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
- application.yml:
# SpringBoot配置RabbitMq rabbitmq: host: localhost # rabbitmq的连接地址 port: 5672 # rabbitmq的连接端口号 virtual-host: /hanzoMall # rabbitmq的虚拟host username: hanzoMall # rabbitmq的用户名 password: hanzoMall # rabbitmq的密码 publisher-confirms: true #如果对异步消息需要回调必须设置为true
- 消息队列枚举配置:
package ltd.hanzo.mall.common; import com.rabbitmq.client.AMQP; import lombok.Getter; @Getter public enum QueueEnum { /** * 短信消息通知队列 * exchange:mall.sms.direct * queue:mall.sms.send * routeKey:mall.sms.send */ QUEUE_SMS_SEND("mall.sms.direct", "mall.sms.send", "mall.sms.send"), /** * 邮件消息通知队列 * exchange:mall.email.direct * queue:mall.email.send * routeKey:mall.email.send */ QUEUE_EMAIL_SEND("mall.email.direct", "mall.email.send", "mall.email.send"), /** * “订单取消”消息通知队列 * exchange:mall.order.direct * queue:mall.order.cancel * routeKey:mall.order.cancel */ QUEUE_ORDER_CANCEL("mall.order.direct", "mall.order.cancel", "mall.order.cancel"), /** * “订单延迟”消息通知队列 * exchange:mall.order.direct.ttl * queue:mall.order.cancel.ttl * routeKey:mall.order.cancel.ttl * 订单消息会被转发到此队列,并在此队列保存一定时间,等到超时后会自动将消息发送到 mall.order.cancel(取消订单消息消费队列)。 */ QUEUE_TTL_ORDER_CANCEL("mall.order.direct.ttl", "mall.order.cancel.ttl", "mall.order.cancel.ttl"); /** * 交换机名称 */ private String exchange; /** * 队列名称 */ private String name; /** * 路由键 */ private String routeKey; QueueEnum(String exchange, String name, String routeKey) { this.exchange = exchange; this.name = name; this.routeKey = routeKey; } }
- RabbitMQ 配置类:
package ltd.hanzo.mall.config; import ltd.hanzo.mall.common.QueueEnum; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; */ @Configuration public class RabbitMqConfig { /* --------------------------------------------------------1、短信消息队列------------------------------------------------------- */ /** * 交换机 */ @Bean DirectExchange sendSmsDirect() { return (DirectExchange) ExchangeBuilder .directExchange(QueueEnum.QUEUE_SMS_SEND.getExchange()) .durable(true) .build(); } /** * 队列 */ @Bean public Queue sendSmsQueue() { return new Queue(QueueEnum.QUEUE_SMS_SEND.getName()); } /** * 绑定 */ @Bean Binding sendSmsBinding(DirectExchange sendSmsDirect, Queue sendSmsQueue){ return BindingBuilder .bind(sendSmsQueue) .to(sendSmsDirect) .with(QueueEnum.QUEUE_SMS_SEND.getRouteKey()); } /* --------------------------------------------------------2、邮件消息队列------------------------------------------------------- */ /** * 交换机 */ @Bean DirectExchange sendEmailDirect() { return (DirectExchange) ExchangeBuilder .directExchange(QueueEnum.QUEUE_EMAIL_SEND.getExchange()) .durable(true) .build(); } /** * 队列 */ @Bean public Queue sendEmailQueue() { return new Queue(QueueEnum.QUEUE_EMAIL_SEND.getName()); } /** * 绑定 */ @Bean Binding sendEmailBinding(DirectExchange sendEmailDirect, Queue sendEmailQueue){ return BindingBuilder .bind(sendEmailQueue) .to(sendEmailDirect) .with(QueueEnum.QUEUE_EMAIL_SEND.getRouteKey()); } /* --------------------------------------------------------3、订单取消队列------------------------------------------------------- */ /** * 交换机 */ @Bean DirectExchange orderDirect() { return (DirectExchange) ExchangeBuilder .directExchange(QueueEnum.QUEUE_ORDER_CANCEL.getExchange()) .durable(true) .build(); } /** * 队列 */ @Bean public Queue orderQueue() { return new Queue(QueueEnum.QUEUE_ORDER_CANCEL.getName()); } /** * 绑定 */ @Bean Binding orderBinding(DirectExchange orderDirect,Queue orderQueue){ return BindingBuilder .bind(orderQueue) .to(orderDirect) .with(QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey()); } /* --------------------------------------------------------4、订单延迟队列------------------------------------------------------- */ /** * 交换机 */ @Bean DirectExchange orderTtlDirect() { return (DirectExchange) ExchangeBuilder .directExchange(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange()) .durable(true) .build(); } /** * 队列(死信队列) */ @Bean public Queue orderTtlQueue() { return QueueBuilder .durable(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getName()) .withArgument("x-dead-letter-exchange", QueueEnum.QUEUE_ORDER_CANCEL.getExchange()) // 到期后转发的交换机 .withArgument("x-dead-letter-routing-key", QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey()) // 到期后转发的路由键 .build(); } /** * 绑定 */ @Bean Binding orderTtlBinding(DirectExchange orderTtlDirect,Queue orderTtlQueue){ return BindingBuilder .bind(orderTtlQueue) .to(orderTtlDirect) .with(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey()); } }
- “订单延迟”发送者:向订单延迟消息队列(mall.order.cancel.ttl)里发送消息
package ltd.hanzo.mall.component; import lombok.extern.slf4j.Slf4j; import ltd.hanzo.mall.common.QueueEnum; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component @Slf4j public class CancelOrderSender { @Autowired private AmqpTemplate amqpTemplate; public void sendMessage(String orderNo,final long delayTimes){ // 给延迟队列发送消息 amqpTemplate.convertAndSend(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange(), QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey(), orderNo, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { // 给消息设置延迟毫秒值 message.getMessageProperties().setExpiration(String.valueOf(delayTimes)); return message; } }); log.info("send delay message orderNo:{}",orderNo); } }
- “订单延迟”队列(mall.order.cancel.ttl)中消息过期之后就会被转发到达“订单取消”队列(mall.order.cancel);
- “订单取消”接收者:用于从取消订单的消息队列(mall.order.cancel)里接收消息
package ltd.hanzo.mall.component; import lombok.extern.slf4j.Slf4j; import ltd.hanzo.mall.service.HanZoMallOrderService; import ltd.hanzo.mall.service.TaskService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component @RabbitListener(queues = "mall.order.cancel") @Slf4j public class CancelOrderReceiver { @Autowired private HanZoMallOrderService hanZoMallOrderService; @Autowired private TaskService taskService; @RabbitHandler public void handle(String orderNo){ log.info("receive delay message orderNo:{}",orderNo); hanZoMallOrderService.cancelOrder(orderNo); taskService.cancelOrderSendSimpleMail(orderNo); } }
- HanZoMallOrderService接口:创建订单,取消超时订单
public interface HanZoMallOrderService { /** * 保存订单 * * @param user * @param myShoppingCartItems * @return */ String saveOrder(HanZoMallUserVO user, List<HanZoMallShoppingCartItemVO> myShoppingCartItems); /** * 取消单个超时订单 */ @Transactional void cancelOrder(String orderNo); }
- HanZoMallOrderServiceImpl实现类:实现 HanZoMallOrderService 接口
@Slf4j @Service public class HanZoMallOrderServiceImpl implements HanZoMallOrderService { @Resource private HanZoMallOrderMapper hanZoMallOrderMapper; @Resource private HanZoMallOrderItemMapper hanZoMallOrderItemMapper; @Resource private HanZoMallShoppingCartItemMapper hanZoMallShoppingCartItemMapper; @Resource private HanZoMallGoodsMapper hanZoMallGoodsMapper; @Autowired private CancelOrderSender cancelOrderSender; @Override @Transactional public String saveOrder(HanZoMallUserVO user, List<HanZoMallShoppingCartItemVO> myShoppingCartItems) { // todo 执行一系类下单操作,代码在github中 // 下单完成后开启一个延迟消息,用于当用户没有付款时取消订单 sendDelayMessageCancelOrder(orderNo); // 所有操作成功后,将订单号返回,以供Controller方法跳转到订单详情 return orderNo; } @Override public void cancelOrder(String orderNo) { HanZoMallOrder hanZoMallOrder = hanZoMallOrderMapper.selectByOrderNo(orderNo); if (hanZoMallOrder != null && hanZoMallOrder.getOrderStatus() == 0) { // 超时取消订单 hanZoMallOrderMapper.closeOrder(Collections.singletonList(hanZoMallOrder.getOrderId()), HanZoMallOrderStatusEnum.ORDER_CLOSED_BY_EXPIRED.getOrderStatus()); } } private void sendDelayMessageCancelOrder(String orderNo) { // 获取订单超时时间,假设为60分钟 long delayTimes = 36 * 100000; // 发送延迟消息 cancelOrderSender.sendMessage(orderNo, delayTimes); } }
死信队列实现
死信队列可以实现消息在未被正常消费的场景下,对这些消息进行其他处理,保证消息不会被丢弃。
死信场景:
- 消息被消费者拒绝签收,并且重新入队为false:(basic.reject() / basic.nack())and requeue = false。
- 注意:消费者设置了自动 ACK,当重复投递次数达到了设置的最大 retry 次数之后,消息也会投递到死信队列,但是内部的原理还是调用了 nack/reject。
- 消息过期,过了 TTL 存活时间。
- 队列设置了 x-max-length 最大消息数量且当前队列中的消息已经达到了这个数量,再次投递,消息将被挤掉,被挤掉的是最靠近被消费那一端的消息。
实现
- 正常业务消息被投递到正常业务的Exchange,该Exchange根据路由键将消息路由到绑定的正常队列。
- 正常业务队列中的消息变成了死信消息之后,会被自动投递到该队列绑定的死信交换机上(并带上配置的路由键,如果没有指定死信消息的路由键,则默认继承该消息在正常业务时设定的路由键)。
- 死信交换机收到消息后,将消息根据路由规则路由到指定的死信队列。
- 消息到达死信队列后,可监听该死信队列,处理死信消息。
- application.yml:
spring: application: name: learn-rabbitmq rabbitmq: host: localhost port: 5672 username: futao password: 123456789 virtual-host: deadletter-vh connection-timeout: 15000 # 发送确认 publisher-confirms: true # 路由失败回调 publisher-returns: true template: # 必须设置成true 消息路由失败通知监听者,而不是将消息丢弃 mandatory: true listener: simple: # 每次从RabbitMQ获取的消息数量 prefetch: 1 default-requeue-rejected: false # 每个队列启动的消费者数量 concurrency: 1 # 每个队列最大的消费者数量 max-concurrency: 1 # 签收模式为手动签收-那么需要在代码中手动ACK acknowledge-mode: manual app: rabbitmq: # 队列定义 queue: # 正常业务队列 user: user-queue # 死信队列 user-dead-letter: user-dead-letter-queue # 交换机定义 exchange: # 正常业务交换机 user: user-exchange # 死信交换机 common-dead-letter: common-dead-letter-exchange
- RabbitMQ 配置类:
@Configuration public class Declare { /* --------------------------------------------------------1、用户消息队列------------------------------------------------------- */ /** * 用户交换机 * * @param userExchangeName 用户交换机名 * @return */ @Bean public Exchange userExchange(@Value("${app.rabbitmq.exchange.user}") String userExchangeName) { return ExchangeBuilder .topicExchange(userExchangeName) .durable(true) .build(); } /** * 用户队列 * * @param userQueueName 用户队列名 * @return */ @Bean public Queue userQueue(@Value("${app.rabbitmq.queue.user}") String userQueueName, @Value("${app.rabbitmq.exchange.common-dead-letter}") String commonDeadLetterExchange) { return QueueBuilder .durable(userQueueName) //声明该队列的死信消息发送到的 交换机 (队列添加了这个参数之后会自动与该交换机绑定,并设置路由键,不需要开发者手动设置) .withArgument("x-dead-letter-exchange", commonDeadLetterExchange) //声明该队列死信消息在交换机的 路由键 .withArgument("x-dead-letter-routing-key", "user-dead-letter-routing-key") .build(); } /** * 用户队列与交换机绑定 * * @param userQueue 用户队列名 * @param userExchange 用户交换机名 * @return */ @Bean public Binding userBinding(Queue userQueue, Exchange userExchange) { return BindingBuilder .bind(userQueue) .to(userExchange) .with("user.*") .noargs(); } /* --------------------------------------------------------2、死信消息队列------------------------------------------------------- */ /** * 死信交换机 * * @param commonDeadLetterExchange 通用死信交换机名 * @return */ @Bean public Exchange commonDeadLetterExchange(@Value("${app.rabbitmq.exchange.common-dead-letter}") String commonDeadLetterExchange) { return ExchangeBuilder .topicExchange(commonDeadLetterExchange) .durable(true) .build(); } /** * 死信交换机 * * 用这个队列来接收 user-queue 的死信消息 * 用户队列 user-queue 的死信投递到死信交换机`common-dead-letter-exchange`后再投递到该队列 * @return */ @Bean public Queue userDeadLetterQueue(@Value("${app.rabbitmq.queue.user-dead-letter}") String userDeadLetterQueue) { return QueueBuilder .durable(userDeadLetterQueue) .build(); } /** * 死信队列绑定死信交换机 * * @param userDeadLetterQueue user-queue对应的死信队列 * @param commonDeadLetterExchange 通用死信交换机 * @return */ @Bean public Binding userDeadLetterBinding(Queue userDeadLetterQueue, Exchange commonDeadLetterExchange) { return BindingBuilder .bind(userDeadLetterQueue) .to(commonDeadLetterExchange) .with("user-dead-letter-routing-key") .noargs(); } }
- 生产者:
@Component public class DeadLetterSender { @Autowired private RabbitTemplate rabbitTemplate; @Value("${app.rabbitmq.exchange.user}") private String userExchange; public void send() { User user = User.builder() .userName("天文") .address("浙江杭州") .birthday(LocalDate.now(ZoneOffset.ofHours(8))) .build(); rabbitTemplate.convertAndSend(userExchange, "user.abc", user); } }
场景一:(basic.reject() / basic.nack())and requeue = false
消息被(basic.reject() or basic.nack()) and requeue = false,即消息被消费者拒绝或者nack,并且重新入队为false。
- nack()支持批量确认,而reject()不支持。
- 消费者:
@Slf4j @Component public class Consumer { /** * 正常用户队列消息监听消费者 * * @param user * @param message * @param channel */ @RabbitListener(queues = "${app.rabbitmq.queue.user}") public void userConsumer(User user, Message message, Channel channel) { log.info("正常用户业务监听:接收到消息:[{}]", JSON.toJSONString(user)); try { //参数为:消息的DeliveryTag,是否批量拒绝,是否重新入队 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); log.info("拒绝签收...消息的路由键为:[{}]", message.getMessageProperties().getReceivedRoutingKey()); } catch (IOException e) { log.error("消息拒绝签收失败", e); } } /** * @param user * @param message * @param channel */ @RabbitListener(queues = "${app.rabbitmq.queue.user-dead-letter}") public void userDeadLetterConsumer(User user, Message message, Channel channel) { log.info("接收到死信消息:[{}]", JSON.toJSONString(user)); try { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); log.info("死信队列签收消息....消息路由键为:[{}]", message.getMessageProperties().getReceivedRoutingKey()); } catch (IOException e) { log.error("死信队列消息签收失败", e); } } }
(autoACK and 重复投递次数>retry)
- application.yml:
spring: application: name: learn-rabbitmq rabbitmq: . . . listener: simple: # 每次从RabbitMQ获取的消息数量 prefetch: 1 default-requeue-rejected: false # 每个队列启动的消费者数量 concurrency: 1 # 每个队列最大的消费者数量 max-concurrency: 1 # 自动签收 acknowledge-mode: auto retry: enabled: true # 第一次尝试时间间隔 initial-interval: 10S # 两次尝试之间的最长持续时间。 max-interval: 10S # 最大重试次数(=第一次正常投递1+重试次数4) max-attempts: 5 # 上一次重试时间的乘数 multiplier: 1.0 . . .
- 消费者:
@Slf4j @Configuration public class AutoAckConsumer { /** * 正常用户队列消息监听消费者 * * @param user */ @RabbitListener(queues = "${app.rabbitmq.queue.user}") public void userConsumer(User user) { log.info("正常用户业务监听:接收到消息:[{}]", JSON.toJSONString(user)); throw new RuntimeException("模拟发生异常"); } /** * @param user */ @RabbitListener(queues = "${app.rabbitmq.queue.user-dead-letter}") public void userDeadLetterConsumer(User user) { log.info("接收到死信消息并自动签收:[{}]", JSON.toJSONString(user)); } }
场景二:消息过期
消息过期,过了TTL存活时间。
- RabbitMQ 配置类:设置队列消息的过期时间 x-message-ttl;
. . . /** * 用户队列 * * @param userQueueName 用户队列名 * @return */ @Bean public Queue userQueue(@Value("${app.rabbitmq.queue.user}") String userQueueName, @Value("${app.rabbitmq.exchange.common-dead-letter}") String commonDeadLetterExchange) { return QueueBuilder .durable(userQueueName) // 声明该队列的死信消息发送到的 交换机 (队列添加了这个参数之后会自动与该交换机绑定,并设置路由键,不需要开发者手动设置) .withArgument("x-dead-letter-exchange", commonDeadLetterExchange) // 声明该队列死信消息在交换机的 路由键 .withArgument("x-dead-letter-routing-key", "user-dead-letter-routing-key") // 该队列的消息的过期时间-超过这个时间还未被消费则路由到死信队列 .withArgument("x-message-ttl", 5000) .build(); } . . .
- 生产者:为每条消息设定过期时间
. . . log.info("消息投递...指定的存活时长为:[{}]ms", exp); rabbitTemplate.convertAndSend(userExchange, "user.abc", user, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { MessageProperties messageProperties = message.getMessageProperties(); //为每条消息设定过期时间 messageProperties.setExpiration(exp); return message; } }); . . .
- 消费者:user-queue的消费者注释,使消息无法被消费
@Slf4j @Component public class Consumer { /** * 正常用户队列消息监听消费者 * * @param user * @param message * @param channel @RabbitListener(queues = "${app.rabbitmq.queue.user}") public void userConsumer(User user, Message message, Channel channel) { log.info("正常用户业务监听:接收到消息:[{}]", JSON.toJSONString(user)); try { //参数为:消息的DeliveryTag,是否批量拒绝,是否重新入队 channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); log.info("拒绝签收...消息的路由键为:[{}]", message.getMessageProperties().getReceivedRoutingKey()); } catch (IOException e) { log.error("消息拒绝签收失败", e); } } */ /** * @param user * @param message * @param channel */ @RabbitListener(queues = "${app.rabbitmq.queue.user-dead-letter}") public void userDeadLetterConsumer(User user, Message message, Channel channel) { log.info("接收到死信消息:[{}]", JSON.toJSONString(user)); try { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); log.info("死信队列签收消息....消息路由键为:[{}]", message.getMessageProperties().getReceivedRoutingKey()); } catch (IOException e) { log.error("死信队列消息签收失败", e); } } }
场景三:队列达到最大消息数量(x-max-length)
- RabbitMQ 配置类:为队列设置最大消息数量x-max-length
. . . /** * 用户队列 * * @param userQueueName 用户队列名 * @return */ @Bean public Queue userQueue(@Value("${app.rabbitmq.queue.user}") String userQueueName, @Value("${app.rabbitmq.exchange.common-dead-letter}") String commonDeadLetterExchange) { return QueueBuilder .durable(userQueueName) //声明该队列的死信消息发送到的 交换机 (队列添加了这个参数之后会自动与该交换机绑定,并设置路由键,不需要开发者手动设置) .withArgument("x-dead-letter-exchange", commonDeadLetterExchange) //声明该队列死信消息在交换机的 路由键 .withArgument("x-dead-letter-routing-key", "user-dead-letter-routing-key") //队列最大消息数量 .withArgument("x-max-length", 2) .build(); } . . .
- 生产者:向队列中投递多条消息;
- 当投递第 3 条消息的时候,RabbitMQ 会把在最靠近被消费那一端的消息移出队列,并投递到死信队列。