“RabbitMQ:SpringBoot集成”的版本间差异
		
		
		
		
		
		跳到导航
		跳到搜索
		
				
		
		
	
|  (→配置队列) | 无编辑摘要 | ||
| (未显示同一用户的47个中间版本) | |||
| 第1行: | 第1行: | ||
| [[category:RabbitMQ]] | [[category:RabbitMQ]] | ||
| [[category:SpringBoot]] | |||
| ==  | == 关于:SpringBoot 集成 RabbitMQ == | ||
| ===  | === 依赖 === | ||
| <syntaxhighlight lang="xml" highlight=""> | <syntaxhighlight lang="xml" highlight=""> | ||
| <dependency> | <dependency> | ||
|      < |      <groupId>org.springframework.boot</groupId> | ||
|      < |      <artifactId>spring-boot-starter-amqp</artifactId> | ||
| </dependency> | </dependency> | ||
| </syntaxhighlight> | </syntaxhighlight> | ||
| ===  | === RabbitMQ 配置 === | ||
| ('''application.yml'''中) | |||
| <syntaxhighlight lang=" | |||
| 简单配置: | |||
| <syntaxhighlight lang="yaml" highlight=""> | |||
| spring: | |||
|   rabbitmq: | |||
|     host: 127.0.0.1 #ip | |||
|     port: 5672      #端口 | |||
|     username: guest #账号 | |||
|     password: guest #密码 | |||
| </syntaxhighlight> | |||
| ==== 全量配置 ==== | |||
| 全量配置说明: | |||
| <syntaxhighlight lang="yaml" highlight=""> | |||
| spring: | spring: | ||
|    rabbitmq: |    rabbitmq: | ||
|      host: 127.0.0.1 #ip | |||
|      host:  |      port: 5672      #端口 | ||
|      port: 5672 |      username: guest #账号 | ||
|      username: guest |      password: guest #密码 | ||
|      password: guest |     virtualHost:    #链接的虚拟主机 | ||
|     addresses: 127.0.0.1:5672     #多个以逗号分隔,与host功能一样。 | |||
|     requestedHeartbeat: 60 #指定心跳超时,单位秒,0为不指定;默认60s | |||
|     publisherConfirms: true  #发布确认机制是否启用 | |||
|     publisherReturns: #发布返回是否启用 | |||
|     connectionTimeout: #链接超时。单位ms。0表示无穷大不超时 | |||
|     ### ssl相关 | |||
|     ssl: | |||
|       enabled: #是否支持ssl | |||
|       keyStore: #指定持有SSL certificate的key store的路径 | |||
|       keyStoreType: #key store类型 默认PKCS12 | |||
|       keyStorePassword: #指定访问key store的密码 | |||
|       trustStore: #指定持有SSL certificates的Trust store | |||
|       trustStoreType: #默认JKS | |||
|       trustStorePassword: #访问密码 | |||
|       algorithm: #ssl使用的算法,例如,TLSv1.1 | |||
|       verifyHostname: #是否开启hostname验证 | |||
|     ### cache相关 | |||
|     cache: | |||
|       channel:  | |||
|         size: #缓存中保持的channel数量 | |||
|         checkoutTimeout: #当缓存数量被设置时,从缓存中获取一个channel的超时时间,单位毫秒;如果为0,则总是创建一个新channel | |||
|       connection: | |||
|         mode: #连接工厂缓存模式:CHANNEL 和 CONNECTION | |||
|         size: #缓存的连接数,只有是CONNECTION模式时生效 | |||
|     ### listener | |||
|     listener: | |||
|        type: #两种类型,SIMPLE,DIRECT | |||
|        ## simple类型 | |||
|        simple: | |||
|          concurrency: #最小消费者数量 | |||
|          maxConcurrency: #最大的消费者数量 | |||
|          transactionSize: #指定一个事务处理的消息数量,最好是小于等于prefetch的数量 | |||
|          missingQueuesFatal: #是否停止容器当容器中的队列不可用 | |||
|          ## 与direct相同配置部分 | |||
|          autoStartup: #是否自动启动容器 | |||
|          acknowledgeMode: #表示消息确认方式,其有三种配置方式,分别是none、manual和auto;默认auto | |||
|          prefetch: #指定一个请求能处理多少个消息,如果有事务的话,必须大于等于transaction数量 | |||
|          defaultRequeueRejected: #决定被拒绝的消息是否重新入队;默认是true(与参数acknowledge-mode有关系) | |||
|          idleEventInterval: #container events发布频率,单位ms | |||
|          ##重试机制 | |||
|          retry:  | |||
|            stateless: #有无状态 | |||
|            enabled:  #是否开启 | |||
|            maxAttempts: #最大重试次数,默认3 | |||
|            initialInterval: #重试间隔 | |||
|            multiplier: #对于上一次重试的乘数 | |||
|            maxInterval: #最大重试时间间隔 | |||
|        direct: | |||
|          consumersPerQueue: #每个队列消费者数量 | |||
|          missingQueuesFatal: | |||
|          #...其余配置看上方公共配置 | |||
|      ## template相关 | |||
|      template: | |||
|        mandatory: #是否启用强制信息;默认false | |||
|        receiveTimeout: #`receive()`接收方法超时时间 | |||
|        replyTimeout: #`sendAndReceive()`超时时间 | |||
|        exchange: #默认的交换机 | |||
|        routingKey: #默认的路由 | |||
|        defaultReceiveQueue: #默认的接收队列 | |||
|        ## retry重试相关 | |||
|        retry:  | |||
|          enabled: #是否开启 | |||
|          maxAttempts: #最大重试次数 | |||
|          initialInterval: #重试间隔 | |||
|          multiplier: #失败间隔乘数 | |||
|          maxInterval: #最大间隔 | |||
| </syntaxhighlight> | </syntaxhighlight> | ||
| ==  | ==== 属性区别:mandatory、publisher-confirms、publisher-return ==== | ||
| rabbitmq客户端发送消息首先发送的交换器 exchange,然后通过路由键 routingKey 和 bindingKey 比较判定需要将消息发送到那个队列 queue 上。 | |||
| 在这个过程有两个地方消息可能丢失: | |||
| # 消息发送到交换器 exchange 的过程, | |||
| # 消息从交换器 exchange 发送到队列 queue 的过程; | |||
| 这三个属性分别用于保证以上两个过程: | |||
| # '''publisher-confirms''':可以确保生产者到交换器 exchange 消息有没有发送成功; | |||
| # '''publisher-return''':可以在消息没有被路由到指定的 queue 时将消息返回,而不是丢弃; | |||
| # '''mandatory''':指定消息在没有被队列接收时是否强行退回还是直接丢弃; | |||
| * publisher-return 通常会和 mandatory 属性配合一起使用。 | |||
| publisher-return、mandatory 都是指定未找到合适队列时将消息退回,各自的作用可以从 RabbitAutoConfiguration 自动化配置类中看清楚: | |||
| <syntaxhighlight lang="Java" highlight=""> | |||
|         @Bean | |||
|         @ConditionalOnSingleCandidate(ConnectionFactory.class) | |||
|         @ConditionalOnMissingBean | |||
|         public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { | |||
|             PropertyMapper map = PropertyMapper.get(); | |||
|             RabbitTemplate template = new RabbitTemplate(connectionFactory); | |||
|             MessageConverter messageConverter = (MessageConverter)this.messageConverter.getIfUnique(); | |||
|             if (messageConverter != null) { | |||
|                 template.setMessageConverter(messageConverter); | |||
|             } | |||
|             // 设置 rabbitmq 处理未被 queue 接收消息的模式 | |||
|             template.setMandatory(this.determineMandatoryFlag()); | |||
|             Template properties = this.properties.getTemplate(); | |||
|             if (properties.getRetry().isEnabled()) { | |||
|                 template.setRetryTemplate((new RetryTemplateFactory((List)this.retryTemplateCustomizers.orderedStream().collect(Collectors.toList()))) | |||
|                                      .createRetryTemplate(properties.getRetry(), Target.SENDER)); | |||
|             } | |||
|             properties.getClass(); | |||
|             map.from(properties::getReceiveTimeout).whenNonNull().as(Duration::toMillis).to(template::setReceiveTimeout); | |||
|             properties.getClass(); | |||
|             map.from(properties::getReplyTimeout).whenNonNull().as(Duration::toMillis).to(template::setReplyTimeout); | |||
|             properties.getClass(); | |||
|             map.from(properties::getExchange).to(template::setExchange); | |||
|             properties.getClass(); | |||
|             map.from(properties::getRoutingKey).to(template::setRoutingKey); | |||
|             properties.getClass(); | |||
|             map.from(properties::getDefaultReceiveQueue).whenNonNull().to(template::setDefaultReceiveQueue); | |||
|             return template; | |||
|         } | |||
|         // 判定是否将未找到合适 queue 的消息退回 | |||
|         private boolean determineMandatoryFlag() { | |||
|           	/** | |||
|               * 获取 spring.rabbitmq.template.mandatory 属性配置; | |||
|               * 这里面会有三种可能,为 null、false、true | |||
|               * 而只有在 mandatory 为 null 时才会读取 publisher-return 属性值 | |||
|               **/ | |||
|             Boolean mandatory = this.properties.getTemplate().getMandatory(); | |||
|             return mandatory != null ? mandatory : this.properties.isPublisherReturns(); | |||
|         } | |||
| </syntaxhighlight> | |||
| 从上面的源码可以获取如下信息: | |||
| # spring.rabbitmq.template.mandatory 属性的优先级高于 spring.rabbitmq.publisher-returns; | |||
| # spring.rabbitmq.template.mandatory 属性可能会返回三种值 null、false、true; | |||
| # spring.rabbitmq.template.mandatory 结果为 true、false 时会忽略掉 spring.rabbitmq.publisher-returns 属性的值; | |||
| # spring.rabbitmq.template.mandatory 结果为 null(即不配置)时结果由 spring.rabbitmq.publisher-returns 确定; | |||
| === 关键代码 === | |||
| '''配置类''': | |||
| <syntaxhighlight lang="Java" highlight=""> | |||
|      @Bean |      @Bean | ||
|      public Queue  |     public AmqpTemplate amqpTemplate() { | ||
|          //  |         Logger log = LoggerFactory.getLogger(RabbitTemplate.class); | ||
|          return  | |||
|         // 使用jackson 消息转换器 | |||
|         rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); | |||
|         rabbitTemplate.setEncoding("UTF-8"); | |||
|         // 消息发送失败返回到队列中,yml需要配置 publisher-returns: true | |||
|         rabbitTemplate.setMandatory(true); | |||
|         rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { | |||
|             String correlationId = message.getMessageProperties().getCorrelationIdString(); | |||
|             log.debug("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {}  路由键: {}", correlationId, replyCode, replyText, exchange, routingKey); | |||
|         }); | |||
|         // 消息确认,yml需要配置 publisher-confirms: true | |||
|         rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { | |||
|             if (ack) { | |||
|                 log.debug("消息发送到exchange成功,id: {}", correlationData.getId()); | |||
|             } else { | |||
|                 log.debug("消息发送到exchange失败,原因: {}", cause); | |||
|             } | |||
|         }); | |||
|         return rabbitTemplate; | |||
|     } | |||
|     /** | |||
|      * 声明Direct交换机 支持持久化. | |||
|      * | |||
|      * @return the exchange | |||
|      */ | |||
|     @Bean("directExchange") | |||
|     public Exchange directExchange() { | |||
|         return ExchangeBuilder.directExchange("DIRECT_EXCHANGE").durable(true).build(); | |||
|     } | |||
|     /** | |||
|      * 声明一个队列 支持持久化. | |||
|      * | |||
|      * @return the queue | |||
|      */ | |||
|     @Bean("directQueue") | |||
|      public Queue directQueue() { | |||
|          return QueueBuilder.durable("DIRECT_QUEUE").build(); | |||
|     } | |||
|     /** | |||
|      * 通过绑定键 将指定队列绑定到一个指定的交换机 . | |||
|      * | |||
|      * @param queue    the queue | |||
|      * @param exchange the exchange | |||
|      * @return the binding | |||
|      */ | |||
|     @Bean | |||
|     public Binding directBinding(@Qualifier("directQueue") Queue queue, | |||
|                        @Qualifier("directExchange") Exchange exchange) { | |||
|          return BindingBuilder.bind(queue).to(exchange).with("DIRECT_ROUTING_KEY").noargs(); | |||
|      } |      } | ||
| </syntaxhighlight> | </syntaxhighlight> | ||
| '''生产者''': | |||
| <syntaxhighlight lang="Java" highlight=""> | |||
|     // 发送消息 | |||
|     rabbitTemplate.setExchange(exchange); | |||
|     rabbitTemplate.setRoutingKey(routeKey); | |||
|     rabbitTemplate.convertAndSend(message); | |||
|     // 发送消息 | |||
|     rabbitTemplate.convertAndSend(exchange, routeKey, message); | |||
|     // 发送消息:为消息设置关联数据??? | |||
|     CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); | |||
|     rabbitTemplate.convertAndSend(exchange, routeKey, message, correlationData); | |||
|     // 发送消息:为消息设置过期时间 | |||
|     amqpTemplate.convertAndSend(exchange, routeKey, message,   | |||
|             new MessagePostProcessor() { | |||
|                 @Override | |||
|                 public Message postProcessMessage(Message message) throws AmqpException { | |||
|                     //给消息设置延迟毫秒值 | |||
|                     message.getMessageProperties().setExpiration(String.valueOf(delayTimes)); | |||
|                     return message; | |||
|                 } | |||
|             }); | |||
| } | |||
| </syntaxhighlight> | </syntaxhighlight> | ||
| <syntaxhighlight lang=" | '''消费者''': | ||
| <syntaxhighlight lang="java" highlight=""> | |||
|     // 基础注解,指定queue的名称,可以多个。除 simple/Work 模式外,都需要配置类来配置queue、exchange及他绑定关系 | |||
|     @RabbitListener(queues = "queue") | |||
|     @RabbitHandler | |||
|     public void processSimpleMsg(String message) { | |||
|         System.out.println("########################received simple" + message); | |||
|     } | |||
|    // 如果不想使用配置类,可以直接注解通过 bindings,绑定,spring 会根据注解生成绑定 | |||
| //  |    // ps:如果已有同名称的类。不会覆盖。会影响功能 | ||
| @ |     @RabbitListener(bindings = { @QueueBinding( value = @Queue(value = "queue", durable = "true"), | ||
|                                                 exchange = @Exchange(value = "exchange", type = "direct"),  | |||
|                                                 key = {"routeKey1","routeKey2"})  | |||
|                                }) | |||
|      @RabbitHandler |      @RabbitHandler | ||
|      public void  |      public void processDirectMsg(String message) { | ||
|          System.out.println(" |          System.out.println("########################received" + message); | ||
|      } |      } | ||
| </syntaxhighlight> | |||
| === 常用注解说明 === | |||
| ==== @Exchange ==== | |||
| @Exchange 是声明交换及交换机的一些属性: | |||
| <syntaxhighlight lang="Java" highlight=""> | |||
| @Target({}) | |||
| @Retention(RetentionPolicy.RUNTIME) | |||
| public @interface Exchange { | |||
|     String TRUE = "true"; | |||
|     String FALSE = "false"; | |||
|     /** | |||
|      * @return the exchange name. | |||
|      */ | |||
|     @AliasFor("name") | |||
|     String value() default ""; | |||
|     /** | |||
|      * @return the exchange name. | |||
|      * @since 2.0 | |||
|      */ | |||
|     @AliasFor("value") | |||
|     String name() default ""; | |||
|     /** | |||
|      * 交换机类型,默认DIRECT | |||
|      */ | |||
|     String type() default ExchangeTypes.DIRECT; | |||
|     /** | |||
|      * 是否持久化 | |||
|      */ | |||
|     String durable() default TRUE; | |||
|     /** | |||
|      * 是否自动删除 | |||
|      */ | |||
|     String autoDelete() default FALSE; | |||
|     /** | |||
|      * @return the arguments to apply when declaring this exchange. | |||
|      * @since 1.6 | |||
|      */ | |||
|     Argument[] arguments() default {}; | |||
| } | } | ||
| </syntaxhighlight> | </syntaxhighlight> | ||
| ==  | ==== @Queue ==== | ||
| @Queue 是声明队列及队列的一些属性,主要参数如下: | |||
| <syntaxhighlight lang="Java" highlight="34-36,39-40,42"> | |||
| @Target({}) | |||
| @Retention(RetentionPolicy.RUNTIME) | |||
| public @interface Queue { | |||
|     /** | |||
|      * @return the queue name or "" for a generated queue name (default). | |||
|      */ | |||
|     @AliasFor("name") | |||
|     String value() default ""; | |||
|     /** | |||
|      * @return the queue name or "" for a generated queue name (default). | |||
|      * @since 2.0 | |||
|      */ | |||
|     @AliasFor("value") | |||
|     String name() default ""; | |||
|     /** | |||
|      * 是否持久化 | |||
|      */ | |||
|     String durable() default ""; | |||
|     /** | |||
|      * 是否独享、排外的. | |||
|      */ | |||
|     String exclusive() default ""; | |||
|     /** | |||
|      * 是否自动删除; | |||
|      */ | |||
|     String autoDelete() default ""; | |||
|     /** | |||
|      * 队列的其他属性参数 | |||
|      *(1)x-message-ttl:消息的过期时间,单位:毫秒; | |||
|      *(2)x-expires:队列过期时间,队列在多长时间未被访问将被删除,单位:毫秒; | |||
|      *(3)x-max-length:队列最大长度,超过该最大值,则将从队列头部开始删除消息; | |||
|      *(4)x-max-length-bytes:队列消息内容占用最大空间,受限于内存大小,超过该阈值则从队列头部开始删除消息; | |||
|      *(5)x-overflow:设置队列溢出行为。这决定了当达到队列的最大长度时消息会发生什么。有效值是drop-head、reject-publish或reject-publish-dlx。仲裁队列类型仅支持drop-head; | |||
|      *(6)x-dead-letter-exchange:死信交换器名称,过期或被删除(因队列长度超长或因空间超出阈值)的消息可指定发送到该交换器中; | |||
|      *(7)x-dead-letter-routing-key:死信消息路由键,在消息发送到死信交换器时会使用该路由键,如果不设置,则使用消息的原来的路由键值 | |||
|      *(8)x-single-active-consumer:表示队列是否是单一活动消费者,true时,注册的消费组内只有一个消费者消费消息,其他被忽略,false时消息循环分发给所有消费者(默认false) | |||
|      *(9)x-max-priority:队列要支持的最大优先级数;如果未设置,队列将不支持消息优先级; | |||
|      *(10)x-queue-mode(Lazy mode):将队列设置为延迟模式,在磁盘上保留尽可能多的消息,以减少RAM的使用;如果未设置,队列将保留内存缓存以尽可能快地传递消息; | |||
|      *(11)x-queue-master-locator:在集群模式下设置镜像队列的主节点信息。 | |||
|      */ | |||
|     Argument[] arguments() default {}; | |||
| } | |||
| </syntaxhighlight> | </syntaxhighlight> | ||
| ==== @QueueBinding ==== | |||
| @QueueBinding作用就是将队列和交换机进行绑定,主要有以下三个参数: | |||
| <syntaxhighlight lang="Java" highlight=""> | |||
| @Target({}) | |||
| @Retention(RetentionPolicy.RUNTIME) | |||
| public @interface QueueBinding { | |||
|     /** | |||
|      * @return the queue. | |||
|      */ | |||
|     Queue value(); | |||
|     /** | |||
|      * @return the exchange. | |||
|      */ | |||
|     Exchange exchange(); | |||
|     /** | |||
|      * @return the routing key or pattern for the binding. | |||
|      * Multiple elements will result in multiple bindings. | |||
|      */ | |||
|     String[] key() default {}; | |||
| } | |||
| </syntaxhighlight> | </syntaxhighlight> | ||
| == 使用自定义消息类型 == | === 使用自定义消息类型 === | ||
| 前面我们发送的消息是一个字符串类型,实际业务中我们更愿意直接发送各种自定义'''Java对象类型'''的数据。 | 前面我们发送的消息是一个字符串类型,实际业务中我们更愿意直接发送各种自定义'''Java对象类型'''的数据。 | ||
| === 定义一个实体对象 === | ==== 定义一个实体对象 ==== | ||
| <syntaxhighlight lang="Java" highlight=""> | <syntaxhighlight lang="Java" highlight=""> | ||
| package com.tizi365.rabbitmq.domain; | package com.tizi365.rabbitmq.domain; | ||
| 第149行: | 第431行: | ||
| </syntaxhighlight> | </syntaxhighlight> | ||
| === 发送自定义类型消息 === | ==== 发送自定义类型消息 ==== | ||
| <syntaxhighlight lang="Java" highlight=""> | <syntaxhighlight lang="Java" highlight=""> | ||
| Blog blog = new Blog(); | Blog blog = new Blog(); | ||
| 第159行: | 第441行: | ||
| </syntaxhighlight> | </syntaxhighlight> | ||
| === 接收自定义类型消息 === | ==== 接收自定义类型消息 ==== | ||
| <syntaxhighlight lang="Java" highlight=""> | <syntaxhighlight lang="Java" highlight=""> | ||
| @RabbitHandler | @RabbitHandler | ||
| 第168行: | 第450行: | ||
| </syntaxhighlight> | </syntaxhighlight> | ||
| === 使用Json序列化消息内容 === | ==== 使用Json序列化消息内容 ==== | ||
| RabbitMQ 发送Java实体对象数据的时候,默认使用'''JDK的对象序列化工具'''。我们可以改成使用json格式对数据进行序列化,这样可以支持其他类型的语言消费Java发送出去的消息,同时也让消息格式更具有可读性。 | RabbitMQ 发送Java实体对象数据的时候,默认使用'''JDK的对象序列化工具'''。我们可以改成使用json格式对数据进行序列化,这样可以支持其他类型的语言消费Java发送出去的消息,同时也让消息格式更具有可读性。 | ||
| 第178行: | 第460行: | ||
|     // 设置默认消息转换器 |     // 设置默认消息转换器 | ||
|     return new Jackson2JsonMessageConverter(); |     return new Jackson2JsonMessageConverter(); | ||
| } | |||
| </syntaxhighlight> | |||
| == 集成示例 == | |||
| # '''maven''': | |||
| #: <syntaxhighlight lang="xml" highlight=""> | |||
| <dependencies> | |||
|     <dependency> | |||
|         <groupId>org.springframework.boot</groupId> | |||
|         <artifactId>spring-boot-starter-amqp</artifactId> | |||
|     </dependency> | |||
|     <dependency> | |||
|         <groupId>com.fasterxml.jackson.core</groupId> | |||
|         <artifactId>jackson-databind</artifactId> | |||
|         <version>2.9.6</version> | |||
|     </dependency> | |||
|     <dependency> | |||
|         <groupId>org.springframework.boot</groupId> | |||
|         <artifactId>spring-boot-starter-test</artifactId> | |||
|         <exclusions> | |||
|             <exclusion> | |||
|                 <groupId>com.vaadin.external.google</groupId> | |||
|                 <artifactId>android-json</artifactId> | |||
|             </exclusion> | |||
|         </exclusions> | |||
|     </dependency> | |||
| </dependencies> | |||
| </syntaxhighlight> | |||
| # '''application.yml''': | |||
| #: <syntaxhighlight lang="yaml" highlight=""> | |||
| spring: | |||
|   rabbitmq: | |||
|     host: 127.0.0.1 | |||
|     port: 5672 | |||
|     username: spring | |||
|     password: 123456 | |||
|     publisher-confirms: true #支持发布确认 | |||
|     publisher-returns: true  #支持发布返回 | |||
|     listener: | |||
|       simple: | |||
|         acknowledge-mode: manual #采用手动应答 | |||
|         concurrency: 1 #指定最小的消费者数量 | |||
|         max-concurrency: 1 #指定最大的消费者数量 | |||
|         retry: | |||
|           enabled: true #是否支持重试 | |||
| </syntaxhighlight> | |||
| # '''配置类''': | |||
| #* 定制模版类、声明交换机、队列、绑定交换机到队列; | |||
| #: <syntaxhighlight lang="Java" highlight=""> | |||
| @Configuration | |||
| public class RabbitConfig { | |||
|     @Resource | |||
|     private RabbitTemplate rabbitTemplate; | |||
|     /** | |||
|      * 定制化amqp模版      可根据需要定制多个 | |||
|      * <p> | |||
|      * <p> | |||
|      * 此处为模版类定义 Jackson 消息转换器 | |||
|      * ConfirmCallback 接口用于实现消息发送到 RabbitMQ 交换器后接收 ack 回调   即消息发送到exchange  ack | |||
|      * ReturnCallback 接口用于实现消息发送到 RabbitMQ 交换器,但无相应队列与交换器绑定时的回调  即消息发送不到任何一个队列中  ack | |||
|      * | |||
|      * @return the amqp template | |||
|      */ | |||
|     // @Primary | |||
|     @Bean | |||
|     public AmqpTemplate amqpTemplate() { | |||
|         Logger log = LoggerFactory.getLogger(RabbitTemplate.class); | |||
|         // 使用jackson 消息转换器 | |||
|         rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); | |||
|         rabbitTemplate.setEncoding("UTF-8"); | |||
|         // 消息发送失败返回到队列中,yml需要配置 publisher-returns: true | |||
|         rabbitTemplate.setMandatory(true); | |||
|         rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { | |||
|             String correlationId = message.getMessageProperties().getCorrelationIdString(); | |||
|             log.debug("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {}  路由键: {}", correlationId, replyCode, replyText, exchange, routingKey); | |||
|         }); | |||
|         // 消息确认,yml需要配置 publisher-confirms: true | |||
|         rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { | |||
|             if (ack) { | |||
|                 log.debug("消息发送到exchange成功,id: {}", correlationData.getId()); | |||
|             } else { | |||
|                 log.debug("消息发送到exchange失败,原因: {}", cause); | |||
|             } | |||
|         }); | |||
|         return rabbitTemplate; | |||
|     } | |||
|     /* ----------------------------------------------------------------------------Direct exchange test--------------------------------------------------------------------------- */ | |||
|     /** | |||
|      * 声明Direct交换机 支持持久化. | |||
|      * | |||
|      * @return the exchange | |||
|      */ | |||
|     @Bean("directExchange") | |||
|     public Exchange directExchange() { | |||
|         return ExchangeBuilder.directExchange("DIRECT_EXCHANGE").durable(true).build(); | |||
|     } | |||
|     /** | |||
|      * 声明一个队列 支持持久化. | |||
|      * | |||
|      * @return the queue | |||
|      */ | |||
|     @Bean("directQueue") | |||
|     public Queue directQueue() { | |||
|         return QueueBuilder.durable("DIRECT_QUEUE").build(); | |||
|     } | |||
|     /** | |||
|      * 通过绑定键 将指定队列绑定到一个指定的交换机 . | |||
|      * | |||
|      * @param queue    the queue | |||
|      * @param exchange the exchange | |||
|      * @return the binding | |||
|      */ | |||
|     @Bean | |||
|     public Binding directBinding(@Qualifier("directQueue") Queue queue, | |||
|                                  @Qualifier("directExchange") Exchange exchange) { | |||
|         return BindingBuilder.bind(queue).to(exchange).with("DIRECT_ROUTING_KEY").noargs(); | |||
|     } | |||
|     /* ----------------------------------------------------------------------------Fanout exchange test--------------------------------------------------------------------------- */ | |||
|     /** | |||
|      * 声明 fanout 交换机. | |||
|      * | |||
|      * @return the exchange | |||
|      */ | |||
|     @Bean("fanoutExchange") | |||
|     public FanoutExchange fanoutExchange() { | |||
|         return (FanoutExchange) ExchangeBuilder.fanoutExchange("FANOUT_EXCHANGE").durable(true).build(); | |||
|     } | |||
|     /** | |||
|      * Fanout queue A. | |||
|      * | |||
|      * @return the queue | |||
|      */ | |||
|     @Bean("fanoutQueueA") | |||
|     public Queue fanoutQueueA() { | |||
|         return QueueBuilder.durable("FANOUT_QUEUE_A").build(); | |||
|     } | |||
|     /** | |||
|      * Fanout queue B . | |||
|      * | |||
|      * @return the queue | |||
|      */ | |||
|     @Bean("fanoutQueueB") | |||
|     public Queue fanoutQueueB() { | |||
|         return QueueBuilder.durable("FANOUT_QUEUE_B").build(); | |||
|     } | |||
|     /** | |||
|      * 绑定队列A 到Fanout 交换机. | |||
|      * | |||
|      * @param queue          the queue | |||
|      * @param fanoutExchange the fanout exchange | |||
|      * @return the binding | |||
|      */ | |||
|     @Bean | |||
|     public Binding bindingA(@Qualifier("fanoutQueueA") Queue queue, | |||
|                             @Qualifier("fanoutExchange") FanoutExchange fanoutExchange) { | |||
|         return BindingBuilder.bind(queue).to(fanoutExchange); | |||
|     } | |||
|     /** | |||
|      * 绑定队列B 到Fanout 交换机. | |||
|      * | |||
|      * @param queue          the queue | |||
|      * @param fanoutExchange the fanout exchange | |||
|      * @return the binding | |||
|      */ | |||
|     @Bean | |||
|     public Binding bindingB(@Qualifier("fanoutQueueB") Queue queue, | |||
|                             @Qualifier("fanoutExchange") FanoutExchange fanoutExchange) { | |||
|         return BindingBuilder.bind(queue).to(fanoutExchange); | |||
|     } | |||
| } | |||
| </syntaxhighlight> | |||
| # '''生产者''': | |||
| #: <syntaxhighlight lang="Java" highlight=""> | |||
| @Service | |||
| public class SenderService { | |||
|     private Logger logger = LoggerFactory.getLogger(this.getClass()); | |||
|     @Resource | |||
|     private RabbitTemplate rabbitTemplate; | |||
|     /** | |||
|      * 测试广播模式. | |||
|      * | |||
|      * @param p the p | |||
|      * @return the response entity | |||
|      */ | |||
|     public void broadcast(String p) { | |||
|         CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); | |||
|         rabbitTemplate.convertAndSend("FANOUT_EXCHANGE", "", p, correlationData); | |||
|     } | |||
|     /** | |||
|      * 测试Direct模式. | |||
|      * | |||
|      * @param p the p | |||
|      * @return the response entity | |||
|      */ | |||
|     public void direct(String p) { | |||
|         CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); | |||
|         rabbitTemplate.convertAndSend("DIRECT_EXCHANGE", "DIRECT_ROUTING_KEY", p, correlationData); | |||
|     } | |||
| } | |||
| </syntaxhighlight> | |||
| # '''消费者''': | |||
| #: <syntaxhighlight lang="Java" highlight=""> | |||
| @Component | |||
| public class Receiver { | |||
|     private static final Logger log = LoggerFactory.getLogger(Receiver.class); | |||
|     /** | |||
|      * FANOUT广播队列监听一. | |||
|      * | |||
|      * @param message the message | |||
|      * @param channel the channel | |||
|      * @throws IOException the io exception  这里异常需要处理 | |||
|      */ | |||
|     @RabbitListener(queues = {"FANOUT_QUEUE_A"}) | |||
|     public void on(Message message, Channel channel) throws IOException { | |||
|         channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); | |||
|         log.debug("FANOUT_QUEUE_A " + new String(message.getBody())); | |||
|     } | |||
|     /** | |||
|      * FANOUT广播队列监听二. | |||
|      * | |||
|      * @param message the message | |||
|      * @param channel the channel | |||
|      * @throws IOException the io exception   这里异常需要处理 | |||
|      */ | |||
|     @RabbitListener(queues = {"FANOUT_QUEUE_B"}) | |||
|     public void t(Message message, Channel channel) throws IOException { | |||
|         channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); | |||
|         log.debug("FANOUT_QUEUE_B " + new String(message.getBody())); | |||
|     } | |||
|     /** | |||
|      * DIRECT模式. | |||
|      * | |||
|      * @param message the message | |||
|      * @param channel the channel | |||
|      * @throws IOException the io exception  这里异常需要处理 | |||
|      */ | |||
|     @RabbitListener(queues = {"DIRECT_QUEUE"}) | |||
|     public void message(Message message, Channel channel) throws IOException { | |||
|         channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); | |||
|         log.debug("DIRECT " + new String(message.getBody())); | |||
|     } | |||
| } | } | ||
| </syntaxhighlight> | </syntaxhighlight> | ||
2022年10月31日 (一) 21:09的最新版本
关于:SpringBoot 集成 RabbitMQ
依赖
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
RabbitMQ 配置
(application.yml中)
简单配置:
spring:
  rabbitmq:
    host: 127.0.0.1 #ip
    port: 5672      #端口
    username: guest #账号
    password: guest #密码
全量配置
全量配置说明:
spring:
  rabbitmq:
    host: 127.0.0.1 #ip
    port: 5672      #端口
    username: guest #账号
    password: guest #密码
    virtualHost:    #链接的虚拟主机
    addresses: 127.0.0.1:5672     #多个以逗号分隔,与host功能一样。
    requestedHeartbeat: 60 #指定心跳超时,单位秒,0为不指定;默认60s
    publisherConfirms: true  #发布确认机制是否启用
    publisherReturns: #发布返回是否启用
    connectionTimeout: #链接超时。单位ms。0表示无穷大不超时
    ### ssl相关
    ssl:
      enabled: #是否支持ssl
      keyStore: #指定持有SSL certificate的key store的路径
      keyStoreType: #key store类型 默认PKCS12
      keyStorePassword: #指定访问key store的密码
      trustStore: #指定持有SSL certificates的Trust store
      trustStoreType: #默认JKS
      trustStorePassword: #访问密码
      algorithm: #ssl使用的算法,例如,TLSv1.1
      verifyHostname: #是否开启hostname验证
    ### cache相关
    cache:
      channel: 
        size: #缓存中保持的channel数量
        checkoutTimeout: #当缓存数量被设置时,从缓存中获取一个channel的超时时间,单位毫秒;如果为0,则总是创建一个新channel
      connection:
        mode: #连接工厂缓存模式:CHANNEL 和 CONNECTION
        size: #缓存的连接数,只有是CONNECTION模式时生效
    ### listener
    listener:
       type: #两种类型,SIMPLE,DIRECT
       ## simple类型
       simple:
         concurrency: #最小消费者数量
         maxConcurrency: #最大的消费者数量
         transactionSize: #指定一个事务处理的消息数量,最好是小于等于prefetch的数量
         missingQueuesFatal: #是否停止容器当容器中的队列不可用
         ## 与direct相同配置部分
         autoStartup: #是否自动启动容器
         acknowledgeMode: #表示消息确认方式,其有三种配置方式,分别是none、manual和auto;默认auto
         prefetch: #指定一个请求能处理多少个消息,如果有事务的话,必须大于等于transaction数量
         defaultRequeueRejected: #决定被拒绝的消息是否重新入队;默认是true(与参数acknowledge-mode有关系)
         idleEventInterval: #container events发布频率,单位ms
         ##重试机制
         retry: 
           stateless: #有无状态
           enabled:  #是否开启
           maxAttempts: #最大重试次数,默认3
           initialInterval: #重试间隔
           multiplier: #对于上一次重试的乘数
           maxInterval: #最大重试时间间隔
       direct:
         consumersPerQueue: #每个队列消费者数量
         missingQueuesFatal:
         #...其余配置看上方公共配置
     ## template相关
     template:
       mandatory: #是否启用强制信息;默认false
       receiveTimeout: #`receive()`接收方法超时时间
       replyTimeout: #`sendAndReceive()`超时时间
       exchange: #默认的交换机
       routingKey: #默认的路由
       defaultReceiveQueue: #默认的接收队列
       ## retry重试相关
       retry: 
         enabled: #是否开启
         maxAttempts: #最大重试次数
         initialInterval: #重试间隔
         multiplier: #失败间隔乘数
         maxInterval: #最大间隔
属性区别:mandatory、publisher-confirms、publisher-return
rabbitmq客户端发送消息首先发送的交换器 exchange,然后通过路由键 routingKey 和 bindingKey 比较判定需要将消息发送到那个队列 queue 上。
在这个过程有两个地方消息可能丢失:
- 消息发送到交换器 exchange 的过程,
- 消息从交换器 exchange 发送到队列 queue 的过程;
这三个属性分别用于保证以上两个过程:
- publisher-confirms:可以确保生产者到交换器 exchange 消息有没有发送成功;
- publisher-return:可以在消息没有被路由到指定的 queue 时将消息返回,而不是丢弃;
- mandatory:指定消息在没有被队列接收时是否强行退回还是直接丢弃;
- publisher-return 通常会和 mandatory 属性配合一起使用。
publisher-return、mandatory 都是指定未找到合适队列时将消息退回,各自的作用可以从 RabbitAutoConfiguration 自动化配置类中看清楚:
        @Bean
        @ConditionalOnSingleCandidate(ConnectionFactory.class)
        @ConditionalOnMissingBean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
            PropertyMapper map = PropertyMapper.get();
            RabbitTemplate template = new RabbitTemplate(connectionFactory);
            MessageConverter messageConverter = (MessageConverter)this.messageConverter.getIfUnique();
            if (messageConverter != null) {
                template.setMessageConverter(messageConverter);
            }
            
            // 设置 rabbitmq 处理未被 queue 接收消息的模式
            template.setMandatory(this.determineMandatoryFlag());
            Template properties = this.properties.getTemplate();
            if (properties.getRetry().isEnabled()) {
                template.setRetryTemplate((new RetryTemplateFactory((List)this.retryTemplateCustomizers.orderedStream().collect(Collectors.toList())))
                                     .createRetryTemplate(properties.getRetry(), Target.SENDER));
            }
            properties.getClass();
            map.from(properties::getReceiveTimeout).whenNonNull().as(Duration::toMillis).to(template::setReceiveTimeout);
            properties.getClass();
            map.from(properties::getReplyTimeout).whenNonNull().as(Duration::toMillis).to(template::setReplyTimeout);
            properties.getClass();
            map.from(properties::getExchange).to(template::setExchange);
            properties.getClass();
            map.from(properties::getRoutingKey).to(template::setRoutingKey);
            properties.getClass();
            map.from(properties::getDefaultReceiveQueue).whenNonNull().to(template::setDefaultReceiveQueue);
            return template;
        }
        
        // 判定是否将未找到合适 queue 的消息退回
        private boolean determineMandatoryFlag() {
          	/**
              * 获取 spring.rabbitmq.template.mandatory 属性配置;
              * 这里面会有三种可能,为 null、false、true
              * 而只有在 mandatory 为 null 时才会读取 publisher-return 属性值
              **/
            Boolean mandatory = this.properties.getTemplate().getMandatory();
            return mandatory != null ? mandatory : this.properties.isPublisherReturns();
        }
从上面的源码可以获取如下信息:
- spring.rabbitmq.template.mandatory 属性的优先级高于 spring.rabbitmq.publisher-returns;
- spring.rabbitmq.template.mandatory 属性可能会返回三种值 null、false、true;
- spring.rabbitmq.template.mandatory 结果为 true、false 时会忽略掉 spring.rabbitmq.publisher-returns 属性的值;
- spring.rabbitmq.template.mandatory 结果为 null(即不配置)时结果由 spring.rabbitmq.publisher-returns 确定;
关键代码
配置类:
    @Bean
    public AmqpTemplate amqpTemplate() {
        Logger log = LoggerFactory.getLogger(RabbitTemplate.class);
        
        // 使用jackson 消息转换器
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        rabbitTemplate.setEncoding("UTF-8");
        
        // 消息发送失败返回到队列中,yml需要配置 publisher-returns: true
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            String correlationId = message.getMessageProperties().getCorrelationIdString();
            log.debug("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {}  路由键: {}", correlationId, replyCode, replyText, exchange, routingKey);
        });
        
        // 消息确认,yml需要配置 publisher-confirms: true
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                log.debug("消息发送到exchange成功,id: {}", correlationData.getId());
            } else {
                log.debug("消息发送到exchange失败,原因: {}", cause);
            }
        });
        return rabbitTemplate;
    }
    
    
    /**
     * 声明Direct交换机 支持持久化.
     *
     * @return the exchange
     */
    @Bean("directExchange")
    public Exchange directExchange() {
        return ExchangeBuilder.directExchange("DIRECT_EXCHANGE").durable(true).build();
    }
    /**
     * 声明一个队列 支持持久化.
     *
     * @return the queue
     */
    @Bean("directQueue")
    public Queue directQueue() {
        return QueueBuilder.durable("DIRECT_QUEUE").build();
    }
    /**
     * 通过绑定键 将指定队列绑定到一个指定的交换机 .
     *
     * @param queue    the queue
     * @param exchange the exchange
     * @return the binding
     */
    @Bean
    public Binding directBinding(@Qualifier("directQueue") Queue queue,
                       @Qualifier("directExchange") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("DIRECT_ROUTING_KEY").noargs();
    }
生产者:
    // 发送消息
    rabbitTemplate.setExchange(exchange);
    rabbitTemplate.setRoutingKey(routeKey);
    rabbitTemplate.convertAndSend(message);
    
    
    // 发送消息
    rabbitTemplate.convertAndSend(exchange, routeKey, message);
    
    // 发送消息:为消息设置关联数据???
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    rabbitTemplate.convertAndSend(exchange, routeKey, message, correlationData);
    // 发送消息:为消息设置过期时间
    amqpTemplate.convertAndSend(exchange, routeKey, message, 
            new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    //给消息设置延迟毫秒值
                    message.getMessageProperties().setExpiration(String.valueOf(delayTimes));
                    return message;
                }
            });
消费者:
    // 基础注解,指定queue的名称,可以多个。除 simple/Work 模式外,都需要配置类来配置queue、exchange及他绑定关系
    @RabbitListener(queues = "queue")
    @RabbitHandler
    public void processSimpleMsg(String message) {
        System.out.println("########################received simple" + message);
    }
   // 如果不想使用配置类,可以直接注解通过 bindings,绑定,spring 会根据注解生成绑定
   // ps:如果已有同名称的类。不会覆盖。会影响功能
    @RabbitListener(bindings = { @QueueBinding( value = @Queue(value = "queue", durable = "true"),
                                                exchange = @Exchange(value = "exchange", type = "direct"), 
                                                key = {"routeKey1","routeKey2"}) 
                               })
    @RabbitHandler
    public void processDirectMsg(String message) {
        System.out.println("########################received" + message);
    }
常用注解说明
@Exchange
@Exchange 是声明交换及交换机的一些属性:
@Target({})
@Retention(RetentionPolicy.RUNTIME)
public @interface Exchange {
    String TRUE = "true";
    String FALSE = "false";
    /**
     * @return the exchange name.
     */
    @AliasFor("name")
    String value() default "";
    /**
     * @return the exchange name.
     * @since 2.0
     */
    @AliasFor("value")
    String name() default "";
    /**
     * 交换机类型,默认DIRECT
     */
    String type() default ExchangeTypes.DIRECT;
    /**
     * 是否持久化
     */
    String durable() default TRUE;
    /**
     * 是否自动删除
     */
    String autoDelete() default FALSE;
    /**
     * @return the arguments to apply when declaring this exchange.
     * @since 1.6
     */
    Argument[] arguments() default {};
}
@Queue
@Queue 是声明队列及队列的一些属性,主要参数如下:
@Target({})
@Retention(RetentionPolicy.RUNTIME)
public @interface Queue {
    /**
     * @return the queue name or "" for a generated queue name (default).
     */
    @AliasFor("name")
    String value() default "";
    /**
     * @return the queue name or "" for a generated queue name (default).
     * @since 2.0
     */
    @AliasFor("value")
    String name() default "";
    /**
     * 是否持久化
     */
    String durable() default "";
    /**
     * 是否独享、排外的.
     */
    String exclusive() default "";
    /**
     * 是否自动删除;
     */
    String autoDelete() default "";
    /**
     * 队列的其他属性参数
     *(1)x-message-ttl:消息的过期时间,单位:毫秒;
     *(2)x-expires:队列过期时间,队列在多长时间未被访问将被删除,单位:毫秒;
     *(3)x-max-length:队列最大长度,超过该最大值,则将从队列头部开始删除消息;
     *(4)x-max-length-bytes:队列消息内容占用最大空间,受限于内存大小,超过该阈值则从队列头部开始删除消息;
     *(5)x-overflow:设置队列溢出行为。这决定了当达到队列的最大长度时消息会发生什么。有效值是drop-head、reject-publish或reject-publish-dlx。仲裁队列类型仅支持drop-head;
     *(6)x-dead-letter-exchange:死信交换器名称,过期或被删除(因队列长度超长或因空间超出阈值)的消息可指定发送到该交换器中;
     *(7)x-dead-letter-routing-key:死信消息路由键,在消息发送到死信交换器时会使用该路由键,如果不设置,则使用消息的原来的路由键值
     *(8)x-single-active-consumer:表示队列是否是单一活动消费者,true时,注册的消费组内只有一个消费者消费消息,其他被忽略,false时消息循环分发给所有消费者(默认false)
     *(9)x-max-priority:队列要支持的最大优先级数;如果未设置,队列将不支持消息优先级;
     *(10)x-queue-mode(Lazy mode):将队列设置为延迟模式,在磁盘上保留尽可能多的消息,以减少RAM的使用;如果未设置,队列将保留内存缓存以尽可能快地传递消息;
     *(11)x-queue-master-locator:在集群模式下设置镜像队列的主节点信息。
     */
    Argument[] arguments() default {};
}
@QueueBinding
@QueueBinding作用就是将队列和交换机进行绑定,主要有以下三个参数:
@Target({})
@Retention(RetentionPolicy.RUNTIME)
public @interface QueueBinding {
    /**
     * @return the queue.
     */
    Queue value();
    /**
     * @return the exchange.
     */
    Exchange exchange();
    /**
     * @return the routing key or pattern for the binding.
     * Multiple elements will result in multiple bindings.
     */
    String[] key() default {};
}
使用自定义消息类型
前面我们发送的消息是一个字符串类型,实际业务中我们更愿意直接发送各种自定义Java对象类型的数据。
定义一个实体对象
package com.tizi365.rabbitmq.domain;
import java.io.Serializable;
import lombok.Data;
// 博客内容
@Data
public class Blog implements Serializable {
    // id
    private Integer id;
    // 标题
    private String title;
}
发送自定义类型消息
Blog blog = new Blog();
blog.setId(100);
blog.setTitle("Tizi365 RabbitMQ教程");
// 发送消息
template.convertAndSend(helloQueue.getName(), blog);
接收自定义类型消息
@RabbitHandler
// 方法参数改为自定义消息类型即可
public void receive(Blog msg) {
    System.out.println("消费者 - 收到消息 '" + msg.getTitle() + "'");
}
使用Json序列化消息内容
RabbitMQ 发送Java实体对象数据的时候,默认使用JDK的对象序列化工具。我们可以改成使用json格式对数据进行序列化,这样可以支持其他类型的语言消费Java发送出去的消息,同时也让消息格式更具有可读性。
修改以前的配置类,增加下面配置, 使用Jackson json解析器对消息数据进行序列化和反序列化。
@Bean
public Jackson2JsonMessageConverter messageConverter() {
   // 设置默认消息转换器
   return new Jackson2JsonMessageConverter();
}
集成示例
- maven:
- <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.9.6</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <exclusions> <exclusion> <groupId>com.vaadin.external.google</groupId> <artifactId>android-json</artifactId> </exclusion> </exclusions> </dependency> </dependencies> 
 
- application.yml:
- spring: rabbitmq: host: 127.0.0.1 port: 5672 username: spring password: 123456 publisher-confirms: true #支持发布确认 publisher-returns: true #支持发布返回 listener: simple: acknowledge-mode: manual #采用手动应答 concurrency: 1 #指定最小的消费者数量 max-concurrency: 1 #指定最大的消费者数量 retry: enabled: true #是否支持重试 
 
- 配置类:
- 定制模版类、声明交换机、队列、绑定交换机到队列;
 - @Configuration public class RabbitConfig { @Resource private RabbitTemplate rabbitTemplate; /** * 定制化amqp模版 可根据需要定制多个 * <p> * <p> * 此处为模版类定义 Jackson 消息转换器 * ConfirmCallback 接口用于实现消息发送到 RabbitMQ 交换器后接收 ack 回调 即消息发送到exchange ack * ReturnCallback 接口用于实现消息发送到 RabbitMQ 交换器,但无相应队列与交换器绑定时的回调 即消息发送不到任何一个队列中 ack * * @return the amqp template */ // @Primary @Bean public AmqpTemplate amqpTemplate() { Logger log = LoggerFactory.getLogger(RabbitTemplate.class); // 使用jackson 消息转换器 rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); rabbitTemplate.setEncoding("UTF-8"); // 消息发送失败返回到队列中,yml需要配置 publisher-returns: true rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { String correlationId = message.getMessageProperties().getCorrelationIdString(); log.debug("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {} 路由键: {}", correlationId, replyCode, replyText, exchange, routingKey); }); // 消息确认,yml需要配置 publisher-confirms: true rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { log.debug("消息发送到exchange成功,id: {}", correlationData.getId()); } else { log.debug("消息发送到exchange失败,原因: {}", cause); } }); return rabbitTemplate; } /* ----------------------------------------------------------------------------Direct exchange test--------------------------------------------------------------------------- */ /** * 声明Direct交换机 支持持久化. * * @return the exchange */ @Bean("directExchange") public Exchange directExchange() { return ExchangeBuilder.directExchange("DIRECT_EXCHANGE").durable(true).build(); } /** * 声明一个队列 支持持久化. * * @return the queue */ @Bean("directQueue") public Queue directQueue() { return QueueBuilder.durable("DIRECT_QUEUE").build(); } /** * 通过绑定键 将指定队列绑定到一个指定的交换机 . * * @param queue the queue * @param exchange the exchange * @return the binding */ @Bean public Binding directBinding(@Qualifier("directQueue") Queue queue, @Qualifier("directExchange") Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("DIRECT_ROUTING_KEY").noargs(); } /* ----------------------------------------------------------------------------Fanout exchange test--------------------------------------------------------------------------- */ /** * 声明 fanout 交换机. * * @return the exchange */ @Bean("fanoutExchange") public FanoutExchange fanoutExchange() { return (FanoutExchange) ExchangeBuilder.fanoutExchange("FANOUT_EXCHANGE").durable(true).build(); } /** * Fanout queue A. * * @return the queue */ @Bean("fanoutQueueA") public Queue fanoutQueueA() { return QueueBuilder.durable("FANOUT_QUEUE_A").build(); } /** * Fanout queue B . * * @return the queue */ @Bean("fanoutQueueB") public Queue fanoutQueueB() { return QueueBuilder.durable("FANOUT_QUEUE_B").build(); } /** * 绑定队列A 到Fanout 交换机. * * @param queue the queue * @param fanoutExchange the fanout exchange * @return the binding */ @Bean public Binding bindingA(@Qualifier("fanoutQueueA") Queue queue, @Qualifier("fanoutExchange") FanoutExchange fanoutExchange) { return BindingBuilder.bind(queue).to(fanoutExchange); } /** * 绑定队列B 到Fanout 交换机. * * @param queue the queue * @param fanoutExchange the fanout exchange * @return the binding */ @Bean public Binding bindingB(@Qualifier("fanoutQueueB") Queue queue, @Qualifier("fanoutExchange") FanoutExchange fanoutExchange) { return BindingBuilder.bind(queue).to(fanoutExchange); } } 
 
- 生产者:
- @Service public class SenderService { private Logger logger = LoggerFactory.getLogger(this.getClass()); @Resource private RabbitTemplate rabbitTemplate; /** * 测试广播模式. * * @param p the p * @return the response entity */ public void broadcast(String p) { CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend("FANOUT_EXCHANGE", "", p, correlationData); } /** * 测试Direct模式. * * @param p the p * @return the response entity */ public void direct(String p) { CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend("DIRECT_EXCHANGE", "DIRECT_ROUTING_KEY", p, correlationData); } } 
 
- 消费者:
- @Component public class Receiver { private static final Logger log = LoggerFactory.getLogger(Receiver.class); /** * FANOUT广播队列监听一. * * @param message the message * @param channel the channel * @throws IOException the io exception 这里异常需要处理 */ @RabbitListener(queues = {"FANOUT_QUEUE_A"}) public void on(Message message, Channel channel) throws IOException { channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); log.debug("FANOUT_QUEUE_A " + new String(message.getBody())); } /** * FANOUT广播队列监听二. * * @param message the message * @param channel the channel * @throws IOException the io exception 这里异常需要处理 */ @RabbitListener(queues = {"FANOUT_QUEUE_B"}) public void t(Message message, Channel channel) throws IOException { channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); log.debug("FANOUT_QUEUE_B " + new String(message.getBody())); } /** * DIRECT模式. * * @param message the message * @param channel the channel * @throws IOException the io exception 这里异常需要处理 */ @RabbitListener(queues = {"DIRECT_QUEUE"}) public void message(Message message, Channel channel) throws IOException { channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); log.debug("DIRECT " + new String(message.getBody())); } }