“RabbitMQ:SpringBoot集成”的版本间差异

来自Wikioe
跳到导航 跳到搜索
无编辑摘要
 
(未显示同一用户的33个中间版本)
第2行: 第2行:
[[category:SpringBoot]]
[[category:SpringBoot]]


== 关于 ==
== 关于:SpringBoot 集成 RabbitMQ ==
目前 Java 操作 RabbitMQ 主要使用 Springboot 的 '''spring-boot-starter-amqp''' 包, 其实就是使用 '''Spring AMQP''' 操作队列。


 
=== 依赖 ===
* 无论使用 RabbitMQ 那种工作模式,区别就是使用的交换机(Exchange)类型和路由参数不一样。
** 简单队列、Work模式,底层会使用默认的交换机(Direct交换机);
 
=== Maven依赖 ===
<syntaxhighlight lang="xml" highlight="">
<syntaxhighlight lang="xml" highlight="">
<dependency>
<dependency>
     <groupid>org.springframework.boot</groupid>
     <groupId>org.springframework.boot</groupId>
     <artifactid>spring-boot-starter-amqp</artifactid>
     <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependency>
</syntaxhighlight>
</syntaxhighlight>


=== 配置RabbitMQ ===
=== RabbitMQ 配置 ===
修改'''application.yml'''配置:
'''application.yml'''中)
<syntaxhighlight lang="xml" highlight="">
 
简单配置:
<syntaxhighlight lang="yaml" highlight="">
spring:
spring:
   rabbitmq:
   rabbitmq:
    # rabbitMQ服务器地址
     host: 127.0.0.1 #ip
     host: localhost
     port: 5672     #端口
     port: 5672
     username: guest #账号
     username: guest
     password: guest #密码
     password: guest
</syntaxhighlight>
</syntaxhighlight>


== 工作模式 ==
以下示例,生产者、消费者位于不同项目,所以两个项目都需要:引入Maven依赖、RabbitMQ配置。
* 根据不同的工作模式,生成者、消费者并都不需要配置队列/交换机;(见:'''[[RabbitMQ:Java]]''')
*(以下省略了Maven依赖、配置文件内容)


=== 快速入门(简单队列) ===
==== 全量配置 ====
----
全量配置说明:
Java RabbitMQ最简单的队列模式就是一个生产者和一个消费者。
<syntaxhighlight lang="yaml" highlight="">
: [[File:RabbitMQ工作模式:简单队列.png|400px]]
spring:
  rabbitmq:
    host: 127.0.0.1 #ip
    port: 5672      #端口
    username: guest #账号
    password: guest #密码
    virtualHost:    #链接的虚拟主机
    addresses: 127.0.0.1:5672    #多个以逗号分隔,与host功能一样。
    requestedHeartbeat: 60 #指定心跳超时,单位秒,0为不指定;默认60s
    publisherConfirms: true  #发布确认机制是否启用
    publisherReturns: #发布返回是否启用
    connectionTimeout: #链接超时。单位ms。0表示无穷大不超时
    ### ssl相关
    ssl:
      enabled: #是否支持ssl
      keyStore: #指定持有SSL certificate的key store的路径
      keyStoreType: #key store类型 默认PKCS12
      keyStorePassword: #指定访问key store的密码
      trustStore: #指定持有SSL certificates的Trust store
      trustStoreType: #默认JKS
      trustStorePassword: #访问密码
      algorithm: #ssl使用的算法,例如,TLSv1.1
      verifyHostname: #是否开启hostname验证
    ### cache相关
    cache:
      channel:
        size: #缓存中保持的channel数量
        checkoutTimeout: #当缓存数量被设置时,从缓存中获取一个channel的超时时间,单位毫秒;如果为0,则总是创建一个新channel
      connection:
        mode: #连接工厂缓存模式:CHANNEL 和 CONNECTION
        size: #缓存的连接数,只有是CONNECTION模式时生效
    ### listener
    listener:
      type: #两种类型,SIMPLE,DIRECT
      ## simple类型
      simple:
        concurrency: #最小消费者数量
        maxConcurrency: #最大的消费者数量
        transactionSize: #指定一个事务处理的消息数量,最好是小于等于prefetch的数量
        missingQueuesFatal: #是否停止容器当容器中的队列不可用
        ## 与direct相同配置部分
        autoStartup: #是否自动启动容器
        acknowledgeMode: #表示消息确认方式,其有三种配置方式,分别是none、manual和auto;默认auto
        prefetch: #指定一个请求能处理多少个消息,如果有事务的话,必须大于等于transaction数量
        defaultRequeueRejected: #决定被拒绝的消息是否重新入队;默认是true(与参数acknowledge-mode有关系)
        idleEventInterval: #container events发布频率,单位ms
        ##重试机制
        retry:
          stateless: #有无状态
          enabled:  #是否开启
          maxAttempts: #最大重试次数,默认3
          initialInterval: #重试间隔
          multiplier: #对于上一次重试的乘数
          maxInterval: #最大重试时间间隔
      direct:
        consumersPerQueue: #每个队列消费者数量
        missingQueuesFatal:
        #...其余配置看上方公共配置
    ## template相关
    template:
      mandatory: #是否启用强制信息;默认false
      receiveTimeout: #`receive()`接收方法超时时间
      replyTimeout: #`sendAndReceive()`超时时间
      exchange: #默认的交换机
      routingKey: #默认的路由
      defaultReceiveQueue: #默认的接收队列
      ## retry重试相关
      retry:
        enabled: #是否开启
        maxAttempts: #最大重试次数
        initialInterval: #重试间隔
        multiplier: #失败间隔乘数
        maxInterval: #最大间隔
</syntaxhighlight>


==== 发送消息 ====
==== 属性区别:mandatory、publisher-confirms、publisher-return ====
# '''配置队列''':
rabbitmq客户端发送消息首先发送的交换器 exchange,然后通过路由键 routingKey 和 bindingKey 比较判定需要将消息发送到那个队列 queue 上。
#: 通过 springboot 的 '''configuration 类'''配置队列:
#: <syntaxhighlight lang="java" highlight="7,9,12">
package com.tizi365.rabbitmq.config;


import org.springframework.amqp.core.Queue;
在这个过程有两个地方消息可能丢失:
import org.springframework.context.annotation.Bean;
# 消息发送到交换器 exchange 的过程,
import org.springframework.context.annotation.Configuration;
# 消息从交换器 exchange 发送到队列 queue 的过程;


@Configuration
public class QueueConfig {
    @Bean
    public Queue helloQueue() {
        // 声明队列, 队列名需要唯一
        return new Queue("hello");
    }
}
</syntaxhighlight>
#* 可以根据业务需要定义多个队列,Queue name(队列名)和 Bean id(此处即方法名)不一样即可。
#*: 如上,Bean id:“helloQueue”;Queue name:“hello”;
# '''发送消息''':
#: 发送消息需要用到 '''RabbitTemplate 类''',springboot已经帮我们初始化,注入实例即可:
#: <syntaxhighlight lang="Java" highlight="10,14,18,19,30">
package com.tizi365.rabbitmq.service;


import org.springframework.amqp.core.Queue;
这三个属性分别用于保证以上两个过程:
import org.springframework.amqp.rabbit.core.RabbitTemplate;
# '''publisher-confirms''':可以确保生产者到交换器 exchange 消息有没有发送成功;
import org.springframework.beans.factory.annotation.Autowired;
# '''publisher-return''':可以在消息没有被路由到指定的 queue 时将消息返回,而不是丢弃;
import org.springframework.beans.factory.annotation.Qualifier;
# '''mandatory''':指定消息在没有被队列接收时是否强行退回还是直接丢弃;
import org.springframework.scheduling.annotation.Scheduled;
* publisher-return 通常会和 mandatory 属性配合一起使用。
import org.springframework.stereotype.Service;


@Service
public class SendService {
    // 注入RabbitTemplate实例
    @Autowired
    private RabbitTemplate template;


    // 注入前面定义的队列
publisher-return、mandatory 都是指定未找到合适队列时将消息退回,各自的作用可以从 RabbitAutoConfiguration 自动化配置类中看清楚:
    @Autowired
<syntaxhighlight lang="Java" highlight="">
    @Qualifier("helloQueue")
        @Bean
    private Queue helloQueue;
        @ConditionalOnSingleCandidate(ConnectionFactory.class)
        @ConditionalOnMissingBean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
            PropertyMapper map = PropertyMapper.get();
            RabbitTemplate template = new RabbitTemplate(connectionFactory);
            MessageConverter messageConverter = (MessageConverter)this.messageConverter.getIfUnique();
            if (messageConverter != null) {
                template.setMessageConverter(messageConverter);
            }
           
            // 设置 rabbitmq 处理未被 queue 接收消息的模式
            template.setMandatory(this.determineMandatoryFlag());
            Template properties = this.properties.getTemplate();
            if (properties.getRetry().isEnabled()) {
                template.setRetryTemplate((new RetryTemplateFactory((List)this.retryTemplateCustomizers.orderedStream().collect(Collectors.toList())))
                                    .createRetryTemplate(properties.getRetry(), Target.SENDER));
            }


    // 为了演示,这里使用spring内置的定时任务,定时发送消息(每秒发送一条消息)
            properties.getClass();
    @Scheduled(fixedDelay = 1000, initialDelay = 1000)
            map.from(properties::getReceiveTimeout).whenNonNull().as(Duration::toMillis).to(template::setReceiveTimeout);
    public void send() {
            properties.getClass();
        // 消息内容
            map.from(properties::getReplyTimeout).whenNonNull().as(Duration::toMillis).to(template::setReplyTimeout);
         String message = "Hello World!";
            properties.getClass();
            map.from(properties::getExchange).to(template::setExchange);
            properties.getClass();
            map.from(properties::getRoutingKey).to(template::setRoutingKey);
            properties.getClass();
            map.from(properties::getDefaultReceiveQueue).whenNonNull().to(template::setDefaultReceiveQueue);
            return template;
         }
          
          
         // 发送消息
         // 判定是否将未找到合适 queue 的消息退回
         // 第一个参数是路由参数,这里使用队列名作为路由参数
         private boolean determineMandatoryFlag() {
        // 第二个参数是消息内容,支持任意类型,只要支持序列化
          /**
        template.convertAndSend(helloQueue.getName(), message);
              * 获取 spring.rabbitmq.template.mandatory 属性配置;
       
              * 这里面会有三种可能,为 null、false、true
        System.out.println("发送消息 '" + message + "'");
              * 而只有在 mandatory 为 null 时才会读取 publisher-return 属性值
    }
              **/
}
            Boolean mandatory = this.properties.getTemplate().getMandatory();
            return mandatory != null ? mandatory : this.properties.isPublisherReturns();
        }
</syntaxhighlight>
</syntaxhighlight>
#* 这里没有直接使用交换机(exchange),底层会使用默认的交换机('''Direct交换机'''???)。
从上面的源码可以获取如下信息:
# spring.rabbitmq.template.mandatory 属性的优先级高于 spring.rabbitmq.publisher-returns;
# spring.rabbitmq.template.mandatory 属性可能会返回三种值 null、false、true;
# spring.rabbitmq.template.mandatory 结果为 true、false 时会忽略掉 spring.rabbitmq.publisher-returns 属性的值;
# spring.rabbitmq.template.mandatory 结果为 null(即不配置)时结果由 spring.rabbitmq.publisher-returns 确定;


==== 接收消息 ====
=== 关键代码 ===
# '''配置队列''':
'''配置类''':
#: 通过 springboot 的 '''configuration 类'''配置队列:
<syntaxhighlight lang="Java" highlight="">
#: <syntaxhighlight lang="java" highlight="7,9,12">
package com.tizi365.rabbitmq.config;
 
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
@Configuration
public class QueueConfig {
     @Bean
     @Bean
     public Queue helloQueue() {
     public AmqpTemplate amqpTemplate() {
         // 声明队列, 队列名需要唯一
        Logger log = LoggerFactory.getLogger(RabbitTemplate.class);
         return new Queue("hello");
       
        // 使用jackson 消息转换器
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        rabbitTemplate.setEncoding("UTF-8");
       
         // 消息发送失败返回到队列中,yml需要配置 publisher-returns: true
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            String correlationId = message.getMessageProperties().getCorrelationIdString();
            log.debug("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {}  路由键: {}", correlationId, replyCode, replyText, exchange, routingKey);
        });
       
        // 消息确认,yml需要配置 publisher-confirms: true
         rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                log.debug("消息发送到exchange成功,id: {}", correlationData.getId());
            } else {
                log.debug("消息发送到exchange失败,原因: {}", cause);
            }
        });
        return rabbitTemplate;
     }
     }
}
   
</syntaxhighlight>
   
# '''接收消息''':
    /**
#: 接收消息需要用 '''@RabbitListener(queues = "xxx") 注解'''指定接受哪个 Queue 的消息:
    * 声明Direct交换机 支持持久化.
#* “@RabbitListener”注解可以作用在类上,也可以作用在方法上;
    *
#** 如果其定义在类上,则需要配合“'''@RabbitHandler'''”注解标记由那个类方法执行消息处理。
    * @return the exchange
#: <syntaxhighlight lang="Java" highlight="9,12">
    */
package com.tizi365.rabbitmq.listener;
   @Bean("directExchange")
 
     public Exchange directExchange() {
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
         return ExchangeBuilder.directExchange("DIRECT_EXCHANGE").durable(true).build();
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
 
@Component
// 声明消息监听器,通过queues参数指定监听的队列,需要跟前面的队列名保持一致
@RabbitListener(queues = "hello")
public class HelloListener {
    // 使用RabbitHandler标记消息处理器,用来执行消息处理逻辑
    @RabbitHandler
     public void receive(String msg) {
         System.out.println("消费者 - 收到消息 '" + msg + "'");
     }
     }
}
</syntaxhighlight>
=== Work模式 ===
----
Java RabbitMQ Work模式,就是配置多个消费者消费一个队列的消息,可以提高消息的并发处理速度:
: [[File:RabbitMQ工作模式:工作队列(Work模式).png|400px]]
==== 接收消息(配置多个消费者) ====
通过'''@RabbitListener'''注解,配置'''concurrency'''参数即可实现。
如下,启动10个消费者并发处理消息:
<syntaxhighlight lang="Java" highlight="10">
package com.tizi365.rabbitmq.listener;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
// 声明消息监听器,通过queues参数指定监听的队列
// 关键参数:concurrency 代表当前监听器需要启动多少个消费者,类型是字符串
@RabbitListener(queues = "hello", concurrency = "10")
public class HelloListener {
    // 使用RabbitHandler标记消息处理器,用来执行消息处理逻辑
    @RabbitHandler
    public void receive(String msg) {
        System.out.println("消费者 - 收到消息 '" + msg + "'");
    }
}
</syntaxhighlight>
=== 发布订阅模式 ===
----
Java RabbitMQ发布订阅模式(广播模式、fanout模式),使用的交换机类型为'''FanoutExchange''',就是一个生产者发送的消息会被多个队列的消费者处理:
: [[File:RabbitMQ工作模式:发布订阅模式(广播模式、fanout模式).png|400px]]
==== 发送消息 ====
# 配置交换机:
#: <syntaxhighlight lang="Java" highlight="11,17,23,29,35">
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
   /**
public class QueueConfig {
    * 声明一个队列 支持持久化.
    // 定义交换机
    *
    @Bean
    * @return the queue
     public FanoutExchange fanout() {
    */
        // 参数为交换机名字,不能重复
   @Bean("directQueue")
         return new FanoutExchange("tizi365.fanout");
     public Queue directQueue() {
         return QueueBuilder.durable("DIRECT_QUEUE").build();
     }
     }
}
</syntaxhighlight>
# '''发送消息''':
#: <syntaxhighlight lang="Java" highlight="14,25">
package com.tizi365.rabbitmq.service;


import org.springframework.amqp.core.FanoutExchange;
   /**
import org.springframework.amqp.rabbit.core.RabbitTemplate;
    * 通过绑定键 将指定队列绑定到一个指定的交换机 .
import org.springframework.beans.factory.annotation.Autowired;
    *
import org.springframework.scheduling.annotation.Scheduled;
    * @param queue    the queue
import org.springframework.stereotype.Service;
    * @param exchange the exchange
 
    * @return the binding
@Service
    */
public class SendService {
   @Bean
    @Autowired
     public Binding directBinding(@Qualifier("directQueue") Queue queue,
    private RabbitTemplate template;
                      @Qualifier("directExchange") Exchange exchange) {
    @Autowired
         return BindingBuilder.bind(queue).to(exchange).with("DIRECT_ROUTING_KEY").noargs();
    private FanoutExchange fanout;
 
    // 为演示,这里使用定时任务,每秒发送一条消息
    @Scheduled(fixedDelay = 1000, initialDelay = 1000)
     public void send() {
        // 消息内容
        String message = "Hello World!";
        // 发送消息
        // 第一个参数是交换机名字
        // 第二个参数是路由参数,fanout交换机会忽略路由参数,所以不用设置
         // 第三个参数是消息内容,支持任意类型,只要支持序列化
        template.convertAndSend(fanout.getName(), "", message);
        System.out.println("发送消息 '" + message + "'");
     }
     }
}
</syntaxhighlight>
</syntaxhighlight>


==== 接收消息 ====
# '''配置交换机、队列及绑定''':
#: <syntaxhighlight lang="Java" highlight="11,17,23,29,35">
package com.tizi365.rabbitmq.config;


import org.springframework.amqp.core.FanoutExchange;
'''生产者''':
import org.springframework.context.annotation.Bean;
<syntaxhighlight lang="Java" highlight="">
import org.springframework.context.annotation.Configuration;
   // 发送消息
 
   rabbitTemplate.setExchange(exchange);
@Configuration
   rabbitTemplate.setRoutingKey(routeKey);
public class QueueConfig {
   rabbitTemplate.convertAndSend(message);
     // 定义交换机
      
     @Bean
      
    public FanoutExchange fanout() {
   // 发送消息
        // 参数为交换机名字,不能重复
   rabbitTemplate.convertAndSend(exchange, routeKey, message);
        return new FanoutExchange("tizi365.fanout");
    }
      
      
    @Bean
    public Queue queue1() {
        // 定义队列1
        return new Queue("tizi365.fanout.queue1");
    }


    @Bean
   // 发送消息:为消息设置关联数据???
    public Queue queue2() {
   CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        // 定义队列2
   rabbitTemplate.convertAndSend(exchange, routeKey, message, correlationData);
        return new Queue("tizi365.fanout.queue2");
    }


    @Bean
    public Binding binding1(FanoutExchange fanout, Queue queue1) {
        // 定义一个绑定关系,将队列1绑定到fanout交换机上
        return BindingBuilder.bind(queue1).to(fanout);
    }


    @Bean
   // 发送消息:为消息设置过期时间
    public Binding binding2(FanoutExchange fanout, Queue queue2) {
   amqpTemplate.convertAndSend(exchange, routeKey, message,
        // 定义一个绑定关系,将队列2绑定到fanout交换机上
            new MessagePostProcessor() {
        return BindingBuilder.bind(queue2).to(fanout);
                @Override
    }
                public Message postProcessMessage(Message message) throws AmqpException {
}
                    //给消息设置延迟毫秒值
                    message.getMessageProperties().setExpiration(String.valueOf(delayTimes));
                    return message;
                }
            });
</syntaxhighlight>
</syntaxhighlight>
# '''接收消息''':
#: <syntaxhighlight lang="Java" highlight="10,16">
package com.tizi365.rabbitmq.listener;


import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;


// 将当前类交给Spring管理
'''消费者''':
@Component
<syntaxhighlight lang="java" highlight="">
public class DemoListener {
   // 基础注解,指定queue的名称,可以多个。除 simple/Work 模式外,都需要配置类来配置queue、exchange及他绑定关系
    // 定义一个监听器,通过queues参数指定监听那个队列
     @RabbitListener(queues = "queue")
     @RabbitListener(queues = "tizi365.fanout.queue1")
    @RabbitHandler
     public void receive1(String msg) {
     public void processSimpleMsg(String message) {
         System.out.println("收到队列1的消息 = " + msg);
         System.out.println("########################received simple" + message);
     }
     }


    // 定义一个监听器,通过queues参数指定监听那个队列
 
     @RabbitListener(queues = "tizi365.fanout.queue2")
  // 如果不想使用配置类,可以直接注解通过 bindings,绑定,spring 会根据注解生成绑定
     public void receive2(String msg) {
  // ps:如果已有同名称的类。不会覆盖。会影响功能
         System.out.println("收到队列2的消息 = " + msg);
     @RabbitListener(bindings = { @QueueBinding( value = @Queue(value = "queue", durable = "true"),
                                                exchange = @Exchange(value = "exchange", type = "direct"),
                                                key = {"routeKey1","routeKey2"})
                              })
    @RabbitHandler
     public void processDirectMsg(String message) {
         System.out.println("########################received" + message);
     }
     }
}
</syntaxhighlight>
</syntaxhighlight>
#: 每一条消息,都会分发给所有绑定到当前交换机的队列中,消息会被上面的两个方法分别处理。
==== P.S. :全注解方式定义队列监听器 ====
直接通过 '''@RabbitListener''' 注解的 '''bindings''' 参数定义绑定关系、队列、交换机:
* 不需要前面的 springboot 配置类定义:交换机、队列和绑定关系;【???那发送消息咋搞???还是说分两个配置???】


=== 常用注解说明 ===
==== @Exchange ====
@Exchange 是声明交换及交换机的一些属性:
<syntaxhighlight lang="Java" highlight="">
<syntaxhighlight lang="Java" highlight="">
@RabbitListener(
@Target({})
            bindings = {
@Retention(RetentionPolicy.RUNTIME)
                    @QueueBinding(
public @interface Exchange {
                            value = @Queue(name = "tizi365.fanout.queue3", durable = "true"),
                            exchange = @Exchange(name = "tizi365.fanout", durable = "true", type = ExchangeTypes.FANOUT)
                    )
            }
    )
public void receive3(String msg) {
    System.out.println("收到队列3的消息 = " + msg);
}
</syntaxhighlight>
说明:
* '''QueueBinding''' 注解:定义队列和交换机的绑定关系:“value”参数用于定义队列,“exchange”用于定义交换机;
* '''Queue''' 注解:定义一个队列:“name”参数定义队列名(需要唯一),“durable”参数表示是否需要持久化;
* '''Exchange''' 注解:定义一个交换机:“name”参数定义交换机的名字,“type”参数表示交换机的类型;


=== 路由模式 ===
    String TRUE = "true";
----
Java RabbitMQ路由模式(Direct模式),使用的交换机类型为'''DirectExchange''',跟发布订阅模式的区别就是 Direct 交换机将消息投递到路由参数完全匹配的队列中。
: [[File:RabbitMQ工作模式:路由模式(Direct 模式).png|400px]]


==== 发送消息 ====
    String FALSE = "false";
# '''配置交换机''':
#: <syntaxhighlight lang="Java" highlight="12,31,38">
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
    /**
public class QueueConfig {
    * @return the exchange name.
     @Bean
    */
    public DirectExchange direct() {
     @AliasFor("name")
        // 定义交换机
     String value() default "";
        // 参数为交换机名字,不能重复
        return new DirectExchange("tizi365.direct");
     }
}
</syntaxhighlight>
# '''发送消息''':
#: <syntaxhighlight lang="Java" highlight="">
package com.tizi365.rabbitmq.service;
 
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;


@Service
    /**
public class SendService {
    * @return the exchange name.
    @Autowired
    * @since 2.0
    private RabbitTemplate template;
    */
     @Autowired
     @AliasFor("value")
     private DirectExchange direct;
     String name() default "";


     // 为演示,这里使用定时任务,每秒发送一条消息
     /**
    @Scheduled(fixedDelay = 1000, initialDelay = 1000)
    * 交换机类型,默认DIRECT
    public void send() {
    */
        // 消息内容
    String type() default ExchangeTypes.DIRECT;
        String message = "Hello World!";
        // 发送消息
        // 第一个参数是交换机名字
        // 第二个参数是路由参数,direct交换机将消息投递到路由参数匹配tizi365的队列
        // 第三个参数是消息内容,支持任意类型,只要支持序列化
        template.convertAndSend(direct.getName(), "tizi365", message);
        System.out.println("发送消息 '" + message + "'");
    }
}
</syntaxhighlight>


==== 接收消息 ====
    /**
# '''配置交换机、队列及绑定''':
    * 是否持久化
#: <syntaxhighlight lang="Java" highlight="12,31,38">
    */
package com.tizi365.rabbitmq.config;
    String durable() default TRUE;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
     /**
public class QueueConfig {
    * 是否自动删除
     @Bean
    */
    public DirectExchange direct() {
    String autoDelete() default FALSE;
        // 定义交换机
        // 参数为交换机名字,不能重复
        return new DirectExchange("tizi365.direct");
    }


     @Bean
     /**
    public Queue queue1() {
    * @return the arguments to apply when declaring this exchange.
        // 定义队列1
    * @since 1.6
        return new Queue("tizi365.direct.queue1");
    */
    }
     Argument[] arguments() default {};
 
    @Bean
    public Queue queue2() {
        // 定义队列2
        return new Queue("tizi365.direct.queue2");
     }
 
    @Bean
    public Binding binding1(DirectExchange direct, Queue queue1) {
        // 定义一个绑定关系,将队列1绑定到direct交换机上, 路由参数为:tizi365
        // 路由参数匹配tizi365,交换机将消息投递到队列1
        return BindingBuilder.bind(queue1).to(direct).with("tizi365");
    }
 
    @Bean
    public Binding binding2(DirectExchange direct, Queue queue2) {
        // 定义一个绑定关系,将队列2绑定到direct交换机上, 路由参数为:baidu
        // 路由参数匹配baidu,交换机将消息投递到队列2
        return BindingBuilder.bind(queue2).to(direct).with("baidu");
    }
}
}
</syntaxhighlight>
</syntaxhighlight>
# '''接受消息''':
#: <syntaxhighlight lang="Java" highlight="">
package com.tizi365.rabbitmq.listener;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
// 将当前类交给Spring管理
@Component
public class DemoListener {
    // 定义一个监听器,通过queues参数指定监听那个队列
    @RabbitListener(queues = "tizi365.direct.queue1")
    public void receive1(String msg) {
        System.out.println("收到队列1的消息 = " + msg);
    }
    // 定义一个监听器,通过queues参数指定监听那个队列
    @RabbitListener(queues = "tizi365.direct.queue2")
    public void receive2(String msg) {
        System.out.println("收到队列2的消息 = " + msg);
    }
}
</syntaxhighlight>
=== 主题模式 ===
Java RabbitMQ 主题模式(Topic模式),使用的交换机类型为'''TopicExchange''',跟路由模式(Direct)的区别就路由参数支持'''模糊匹配'''。
* 因为路由匹配比较灵活,所以是比较常用的模式;
: [[File:RabbitMQ工作模式:主题模式(Topic 模式).png|400px]]
==== 配置:交换机、队列 ====
通过 Springboot 配置类:
# 定义交换机(TopicExchange);
# 定义队列(Queue);
# 将队列绑定到目标交换机上(Binding);
<syntaxhighlight lang="Java" highlight="30,36">
package com.tizi365.rabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class QueueConfig {
    @Bean
    public TopicExchange topic() {
        // 定义交换机
        // 参数为交换机名字,不能重复
        return new TopicExchange("tizi365.topic");
    }


    @Bean
==== @Queue ====
    public Queue queue1() {
@Queue 是声明队列及队列的一些属性,主要参数如下:
        // 定义队列1
<syntaxhighlight lang="Java" highlight="34-36,39-40,42">
        return new Queue("tizi365.topic.queue1");
@Target({})
     }
@Retention(RetentionPolicy.RUNTIME)
public @interface Queue {
    /**
    * @return the queue name or "" for a generated queue name (default).
    */
    @AliasFor("name")
     String value() default "";


     @Bean
     /**
    public Queue queue2() {
    * @return the queue name or "" for a generated queue name (default).
        // 定义队列2
    * @since 2.0
        return new Queue("tizi365.topic.queue2");
    */
     }
    @AliasFor("value")
     String name() default "";


     @Bean
     /**
    public Binding binding1(TopicExchange topic, Queue queue1) {
    * 是否持久化
        // 定义一个绑定关系,将队列1绑定到topic交换机上, 路由参数为:*.tizi365.com
    */
        return BindingBuilder.bind(queue1).to(topic).with("*.tizi365.com");
    String durable() default "";
    }


     @Bean
     /**
    public Binding binding2(TopicExchange topic, Queue queue2) {
    * 是否独享、排外的.
        // 定义一个绑定关系,将队列2绑定到direct交换机上, 路由参数为:*.baidu.com
    */
        return BindingBuilder.bind(queue2).to(topic).with("*.baidu.com");
    String exclusive() default "";
    }
}
</syntaxhighlight>
* 支持的通配符:“'''#'''”匹配一个或多个单词;“'''*'''”:仅匹配一个单词。


==== 发送消息 ====
    /**
<syntaxhighlight lang="Java" highlight="">
    * 是否自动删除;
package com.tizi365.rabbitmq.service;
    */
    String autoDelete() default "";


import org.springframework.amqp.core.DirectExchange;
    /**
import org.springframework.amqp.core.TopicExchange;
    * 队列的其他属性参数
import org.springframework.amqp.rabbit.core.RabbitTemplate;
    *(1)x-message-ttl:消息的过期时间,单位:毫秒;
import org.springframework.beans.factory.annotation.Autowired;
    *(2)x-expires:队列过期时间,队列在多长时间未被访问将被删除,单位:毫秒;
import org.springframework.scheduling.annotation.Scheduled;
    *(3)x-max-length:队列最大长度,超过该最大值,则将从队列头部开始删除消息;
import org.springframework.stereotype.Service;
    *(4)x-max-length-bytes:队列消息内容占用最大空间,受限于内存大小,超过该阈值则从队列头部开始删除消息;
 
    *(5)x-overflow:设置队列溢出行为。这决定了当达到队列的最大长度时消息会发生什么。有效值是drop-head、reject-publish或reject-publish-dlx。仲裁队列类型仅支持drop-head;
@Service
    *(6)x-dead-letter-exchange:死信交换器名称,过期或被删除(因队列长度超长或因空间超出阈值)的消息可指定发送到该交换器中;
public class SendService {
    *(7)x-dead-letter-routing-key:死信消息路由键,在消息发送到死信交换器时会使用该路由键,如果不设置,则使用消息的原来的路由键值
    @Autowired
    *(8)x-single-active-consumer:表示队列是否是单一活动消费者,true时,注册的消费组内只有一个消费者消费消息,其他被忽略,false时消息循环分发给所有消费者(默认false)
    private RabbitTemplate template;
    *(9)x-max-priority:队列要支持的最大优先级数;如果未设置,队列将不支持消息优先级;
    @Autowired
    *(10)x-queue-mode(Lazy mode):将队列设置为延迟模式,在磁盘上保留尽可能多的消息,以减少RAM的使用;如果未设置,队列将保留内存缓存以尽可能快地传递消息;
    private TopicExchange topic;
    *(11)x-queue-master-locator:在集群模式下设置镜像队列的主节点信息。
 
    */
    // 为演示,这里使用定时任务,每秒发送一条消息
     Argument[] arguments() default {};
    @Scheduled(fixedDelay = 1000, initialDelay = 1000)
     public void send() {
        // 消息内容
        String message = "Hello World!";
        // 发送消息
        // 第一个参数是交换机名字
        // 第二个参数是路由参数,topic交换机将消息投递到路由参数匹配的队列
        // 第三个参数是消息内容,支持任意类型,只要支持序列化
        template.convertAndSend(topic.getName(), "www.tizi365.com", message);
        System.out.println("发送消息 '" + message + "'");
    }
}
}
</syntaxhighlight>
</syntaxhighlight>


==== 接收消息 ====
==== @QueueBinding ====
@QueueBinding作用就是将队列和交换机进行绑定,主要有以下三个参数:
<syntaxhighlight lang="Java" highlight="">
<syntaxhighlight lang="Java" highlight="">
package com.tizi365.rabbitmq.listener;
@Target({})
@Retention(RetentionPolicy.RUNTIME)
public @interface QueueBinding {
    /**
    * @return the queue.
    */
    Queue value();


import org.springframework.amqp.rabbit.annotation.*;
    /**
import org.springframework.stereotype.Component;
    * @return the exchange.
 
    */
// 将当前类交给Spring管理
     Exchange exchange();
@Component
public class DemoListener {
    // 定义一个监听器,通过queues参数指定监听那个队列
     @RabbitListener(queues = "tizi365.topic.queue1")
    public void receive1(String msg) {
        System.out.println("收到队列1的消息 = " + msg);
    }


     // 定义一个监听器,通过queues参数指定监听那个队列
     /**
    @RabbitListener(queues = "tizi365.topic.queue2")
    * @return the routing key or pattern for the binding.
     public void receive2(String msg) {
    * Multiple elements will result in multiple bindings.
        System.out.println("收到队列2的消息 = " + msg);
    */
    }
     String[] key() default {};
}
}
</syntaxhighlight>
</syntaxhighlight>


== 使用自定义消息类型 ==
=== 使用自定义消息类型 ===
前面我们发送的消息是一个字符串类型,实际业务中我们更愿意直接发送各种自定义'''Java对象类型'''的数据。
前面我们发送的消息是一个字符串类型,实际业务中我们更愿意直接发送各种自定义'''Java对象类型'''的数据。


=== 定义一个实体对象 ===
==== 定义一个实体对象 ====
<syntaxhighlight lang="Java" highlight="">
<syntaxhighlight lang="Java" highlight="">
package com.tizi365.rabbitmq.domain;
package com.tizi365.rabbitmq.domain;
第572行: 第431行:
</syntaxhighlight>
</syntaxhighlight>


=== 发送自定义类型消息 ===
==== 发送自定义类型消息 ====
<syntaxhighlight lang="Java" highlight="">
<syntaxhighlight lang="Java" highlight="">
Blog blog = new Blog();
Blog blog = new Blog();
第582行: 第441行:
</syntaxhighlight>
</syntaxhighlight>


=== 接收自定义类型消息 ===
==== 接收自定义类型消息 ====
<syntaxhighlight lang="Java" highlight="">
<syntaxhighlight lang="Java" highlight="">
@RabbitHandler
@RabbitHandler
第591行: 第450行:
</syntaxhighlight>
</syntaxhighlight>


=== 使用Json序列化消息内容 ===
==== 使用Json序列化消息内容 ====
RabbitMQ 发送Java实体对象数据的时候,默认使用'''JDK的对象序列化工具'''。我们可以改成使用json格式对数据进行序列化,这样可以支持其他类型的语言消费Java发送出去的消息,同时也让消息格式更具有可读性。
RabbitMQ 发送Java实体对象数据的时候,默认使用'''JDK的对象序列化工具'''。我们可以改成使用json格式对数据进行序列化,这样可以支持其他类型的语言消费Java发送出去的消息,同时也让消息格式更具有可读性。


第601行: 第460行:
   // 设置默认消息转换器
   // 设置默认消息转换器
   return new Jackson2JsonMessageConverter();
   return new Jackson2JsonMessageConverter();
}
</syntaxhighlight>
== 集成示例 ==
# '''maven''':
#: <syntaxhighlight lang="xml" highlight="">
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.9.6</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <exclusions>
            <exclusion>
                <groupId>com.vaadin.external.google</groupId>
                <artifactId>android-json</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
</dependencies>
</syntaxhighlight>
# '''application.yml''':
#: <syntaxhighlight lang="yaml" highlight="">
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: spring
    password: 123456
    publisher-confirms: true #支持发布确认
    publisher-returns: true  #支持发布返回
    listener:
      simple:
        acknowledge-mode: manual #采用手动应答
        concurrency: 1 #指定最小的消费者数量
        max-concurrency: 1 #指定最大的消费者数量
        retry:
          enabled: true #是否支持重试
</syntaxhighlight>
# '''配置类''':
#* 定制模版类、声明交换机、队列、绑定交换机到队列;
#: <syntaxhighlight lang="Java" highlight="">
@Configuration
public class RabbitConfig {
   @Resource
   private RabbitTemplate rabbitTemplate;
    /**
    * 定制化amqp模版      可根据需要定制多个
    * <p>
    * <p>
    * 此处为模版类定义 Jackson 消息转换器
    * ConfirmCallback 接口用于实现消息发送到 RabbitMQ 交换器后接收 ack 回调  即消息发送到exchange  ack
    * ReturnCallback 接口用于实现消息发送到 RabbitMQ 交换器,但无相应队列与交换器绑定时的回调  即消息发送不到任何一个队列中  ack
    *
    * @return the amqp template
    */
    // @Primary
   @Bean
   public AmqpTemplate amqpTemplate() {
        Logger log = LoggerFactory.getLogger(RabbitTemplate.class);
       
        // 使用jackson 消息转换器
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        rabbitTemplate.setEncoding("UTF-8");
       
        // 消息发送失败返回到队列中,yml需要配置 publisher-returns: true
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            String correlationId = message.getMessageProperties().getCorrelationIdString();
            log.debug("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {}  路由键: {}", correlationId, replyCode, replyText, exchange, routingKey);
        });
       
        // 消息确认,yml需要配置 publisher-confirms: true
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                log.debug("消息发送到exchange成功,id: {}", correlationData.getId());
            } else {
                log.debug("消息发送到exchange失败,原因: {}", cause);
            }
        });
        return rabbitTemplate;
    }
    /* ----------------------------------------------------------------------------Direct exchange test--------------------------------------------------------------------------- */
   /**
    * 声明Direct交换机 支持持久化.
    *
    * @return the exchange
    */
   @Bean("directExchange")
    public Exchange directExchange() {
        return ExchangeBuilder.directExchange("DIRECT_EXCHANGE").durable(true).build();
    }
   /**
    * 声明一个队列 支持持久化.
    *
    * @return the queue
    */
   @Bean("directQueue")
    public Queue directQueue() {
        return QueueBuilder.durable("DIRECT_QUEUE").build();
    }
   /**
    * 通过绑定键 将指定队列绑定到一个指定的交换机 .
    *
    * @param queue    the queue
    * @param exchange the exchange
    * @return the binding
    */
   @Bean
    public Binding directBinding(@Qualifier("directQueue") Queue queue,
                                @Qualifier("directExchange") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("DIRECT_ROUTING_KEY").noargs();
    }
    /* ----------------------------------------------------------------------------Fanout exchange test--------------------------------------------------------------------------- */
   /**
    * 声明 fanout 交换机.
    *
    * @return the exchange
    */
   @Bean("fanoutExchange")
    public FanoutExchange fanoutExchange() {
        return (FanoutExchange) ExchangeBuilder.fanoutExchange("FANOUT_EXCHANGE").durable(true).build();
    }
   /**
    * Fanout queue A.
    *
    * @return the queue
    */
   @Bean("fanoutQueueA")
    public Queue fanoutQueueA() {
        return QueueBuilder.durable("FANOUT_QUEUE_A").build();
    }
   /**
    * Fanout queue B .
    *
    * @return the queue
    */
   @Bean("fanoutQueueB")
    public Queue fanoutQueueB() {
        return QueueBuilder.durable("FANOUT_QUEUE_B").build();
    }
   /**
    * 绑定队列A 到Fanout 交换机.
    *
    * @param queue          the queue
    * @param fanoutExchange the fanout exchange
    * @return the binding
    */
   @Bean
    public Binding bindingA(@Qualifier("fanoutQueueA") Queue queue,
                            @Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }
   /**
    * 绑定队列B 到Fanout 交换机.
    *
    * @param queue          the queue
    * @param fanoutExchange the fanout exchange
    * @return the binding
    */
   @Bean
    public Binding bindingB(@Qualifier("fanoutQueueB") Queue queue,
                            @Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }
}
</syntaxhighlight>
# '''生产者''':
#: <syntaxhighlight lang="Java" highlight="">
@Service
public class SenderService {
    private Logger logger = LoggerFactory.getLogger(this.getClass());
   @Resource
    private RabbitTemplate rabbitTemplate;
    /**
    * 测试广播模式.
    *
    * @param p the p
    * @return the response entity
    */
    public void broadcast(String p) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend("FANOUT_EXCHANGE", "", p, correlationData);
    }
    /**
    * 测试Direct模式.
    *
    * @param p the p
    * @return the response entity
    */
    public void direct(String p) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend("DIRECT_EXCHANGE", "DIRECT_ROUTING_KEY", p, correlationData);
    }
}
</syntaxhighlight>
# '''消费者''':
#: <syntaxhighlight lang="Java" highlight="">
@Component
public class Receiver {
    private static final Logger log = LoggerFactory.getLogger(Receiver.class);
    /**
    * FANOUT广播队列监听一.
    *
    * @param message the message
    * @param channel the channel
    * @throws IOException the io exception  这里异常需要处理
    */
   @RabbitListener(queues = {"FANOUT_QUEUE_A"})
    public void on(Message message, Channel channel) throws IOException {
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
        log.debug("FANOUT_QUEUE_A " + new String(message.getBody()));
    }
    /**
    * FANOUT广播队列监听二.
    *
    * @param message the message
    * @param channel the channel
    * @throws IOException the io exception  这里异常需要处理
    */
   @RabbitListener(queues = {"FANOUT_QUEUE_B"})
    public void t(Message message, Channel channel) throws IOException {
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
        log.debug("FANOUT_QUEUE_B " + new String(message.getBody()));
    }
    /**
    * DIRECT模式.
    *
    * @param message the message
    * @param channel the channel
    * @throws IOException the io exception  这里异常需要处理
    */
   @RabbitListener(queues = {"DIRECT_QUEUE"})
    public void message(Message message, Channel channel) throws IOException {
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
        log.debug("DIRECT " + new String(message.getBody()));
    }
}
}
</syntaxhighlight>
</syntaxhighlight>

2022年10月31日 (一) 21:09的最新版本


关于:SpringBoot 集成 RabbitMQ

依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

RabbitMQ 配置

application.yml中)

简单配置:

spring:
  rabbitmq:
    host: 127.0.0.1 #ip
    port: 5672      #端口
    username: guest #账号
    password: guest #密码


全量配置

全量配置说明:

spring:
  rabbitmq:
    host: 127.0.0.1 #ip
    port: 5672      #端口
    username: guest #账号
    password: guest #密码
    virtualHost:    #链接的虚拟主机
    addresses: 127.0.0.1:5672     #多个以逗号分隔,与host功能一样。
    requestedHeartbeat: 60 #指定心跳超时,单位秒,0为不指定;默认60s
    publisherConfirms: true  #发布确认机制是否启用
    publisherReturns: #发布返回是否启用
    connectionTimeout: #链接超时。单位ms。0表示无穷大不超时
    ### ssl相关
    ssl:
      enabled: #是否支持ssl
      keyStore: #指定持有SSL certificate的key store的路径
      keyStoreType: #key store类型 默认PKCS12
      keyStorePassword: #指定访问key store的密码
      trustStore: #指定持有SSL certificates的Trust store
      trustStoreType: #默认JKS
      trustStorePassword: #访问密码
      algorithm: #ssl使用的算法,例如,TLSv1.1
      verifyHostname: #是否开启hostname验证
    ### cache相关
    cache:
      channel: 
        size: #缓存中保持的channel数量
        checkoutTimeout: #当缓存数量被设置时,从缓存中获取一个channel的超时时间,单位毫秒;如果为0,则总是创建一个新channel
      connection:
        mode: #连接工厂缓存模式:CHANNEL 和 CONNECTION
        size: #缓存的连接数,只有是CONNECTION模式时生效
    ### listener
    listener:
       type: #两种类型,SIMPLE,DIRECT
       ## simple类型
       simple:
         concurrency: #最小消费者数量
         maxConcurrency: #最大的消费者数量
         transactionSize: #指定一个事务处理的消息数量,最好是小于等于prefetch的数量
         missingQueuesFatal: #是否停止容器当容器中的队列不可用
         ## 与direct相同配置部分
         autoStartup: #是否自动启动容器
         acknowledgeMode: #表示消息确认方式,其有三种配置方式,分别是none、manual和auto;默认auto
         prefetch: #指定一个请求能处理多少个消息,如果有事务的话,必须大于等于transaction数量
         defaultRequeueRejected: #决定被拒绝的消息是否重新入队;默认是true(与参数acknowledge-mode有关系)
         idleEventInterval: #container events发布频率,单位ms
         ##重试机制
         retry: 
           stateless: #有无状态
           enabled:  #是否开启
           maxAttempts: #最大重试次数,默认3
           initialInterval: #重试间隔
           multiplier: #对于上一次重试的乘数
           maxInterval: #最大重试时间间隔
       direct:
         consumersPerQueue: #每个队列消费者数量
         missingQueuesFatal:
         #...其余配置看上方公共配置
     ## template相关
     template:
       mandatory: #是否启用强制信息;默认false
       receiveTimeout: #`receive()`接收方法超时时间
       replyTimeout: #`sendAndReceive()`超时时间
       exchange: #默认的交换机
       routingKey: #默认的路由
       defaultReceiveQueue: #默认的接收队列
       ## retry重试相关
       retry: 
         enabled: #是否开启
         maxAttempts: #最大重试次数
         initialInterval: #重试间隔
         multiplier: #失败间隔乘数
         maxInterval: #最大间隔

属性区别:mandatory、publisher-confirms、publisher-return

rabbitmq客户端发送消息首先发送的交换器 exchange,然后通过路由键 routingKey 和 bindingKey 比较判定需要将消息发送到那个队列 queue 上。

在这个过程有两个地方消息可能丢失:

  1. 消息发送到交换器 exchange 的过程,
  2. 消息从交换器 exchange 发送到队列 queue 的过程;


这三个属性分别用于保证以上两个过程:

  1. publisher-confirms:可以确保生产者到交换器 exchange 消息有没有发送成功;
  2. publisher-return:可以在消息没有被路由到指定的 queue 时将消息返回,而不是丢弃;
  3. mandatory:指定消息在没有被队列接收时是否强行退回还是直接丢弃;
  • publisher-return 通常会和 mandatory 属性配合一起使用。


publisher-return、mandatory 都是指定未找到合适队列时将消息退回,各自的作用可以从 RabbitAutoConfiguration 自动化配置类中看清楚:

        @Bean
        @ConditionalOnSingleCandidate(ConnectionFactory.class)
        @ConditionalOnMissingBean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
            PropertyMapper map = PropertyMapper.get();
            RabbitTemplate template = new RabbitTemplate(connectionFactory);
            MessageConverter messageConverter = (MessageConverter)this.messageConverter.getIfUnique();
            if (messageConverter != null) {
                template.setMessageConverter(messageConverter);
            }
            
            // 设置 rabbitmq 处理未被 queue 接收消息的模式
            template.setMandatory(this.determineMandatoryFlag());
            Template properties = this.properties.getTemplate();
            if (properties.getRetry().isEnabled()) {
                template.setRetryTemplate((new RetryTemplateFactory((List)this.retryTemplateCustomizers.orderedStream().collect(Collectors.toList())))
                                     .createRetryTemplate(properties.getRetry(), Target.SENDER));
            }

            properties.getClass();
            map.from(properties::getReceiveTimeout).whenNonNull().as(Duration::toMillis).to(template::setReceiveTimeout);
            properties.getClass();
            map.from(properties::getReplyTimeout).whenNonNull().as(Duration::toMillis).to(template::setReplyTimeout);
            properties.getClass();
            map.from(properties::getExchange).to(template::setExchange);
            properties.getClass();
            map.from(properties::getRoutingKey).to(template::setRoutingKey);
            properties.getClass();
            map.from(properties::getDefaultReceiveQueue).whenNonNull().to(template::setDefaultReceiveQueue);
            return template;
        }
        
        // 判定是否将未找到合适 queue 的消息退回
        private boolean determineMandatoryFlag() {
          	/**
              * 获取 spring.rabbitmq.template.mandatory 属性配置;
              * 这里面会有三种可能,为 null、false、true
              * 而只有在 mandatory 为 null 时才会读取 publisher-return 属性值
              **/
            Boolean mandatory = this.properties.getTemplate().getMandatory();
            return mandatory != null ? mandatory : this.properties.isPublisherReturns();
        }

从上面的源码可以获取如下信息:

  1. spring.rabbitmq.template.mandatory 属性的优先级高于 spring.rabbitmq.publisher-returns;
  2. spring.rabbitmq.template.mandatory 属性可能会返回三种值 null、false、true;
  3. spring.rabbitmq.template.mandatory 结果为 true、false 时会忽略掉 spring.rabbitmq.publisher-returns 属性的值;
  4. spring.rabbitmq.template.mandatory 结果为 null(即不配置)时结果由 spring.rabbitmq.publisher-returns 确定;

关键代码

配置类

    @Bean
    public AmqpTemplate amqpTemplate() {
        Logger log = LoggerFactory.getLogger(RabbitTemplate.class);
        
        // 使用jackson 消息转换器
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        rabbitTemplate.setEncoding("UTF-8");
        
        // 消息发送失败返回到队列中,yml需要配置 publisher-returns: true
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            String correlationId = message.getMessageProperties().getCorrelationIdString();
            log.debug("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {}  路由键: {}", correlationId, replyCode, replyText, exchange, routingKey);
        });
        
        // 消息确认,yml需要配置 publisher-confirms: true
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                log.debug("消息发送到exchange成功,id: {}", correlationData.getId());
            } else {
                log.debug("消息发送到exchange失败,原因: {}", cause);
            }
        });
        return rabbitTemplate;
    }
    
    
    /**
     * 声明Direct交换机 支持持久化.
     *
     * @return the exchange
     */
    @Bean("directExchange")
    public Exchange directExchange() {
        return ExchangeBuilder.directExchange("DIRECT_EXCHANGE").durable(true).build();
    }

    /**
     * 声明一个队列 支持持久化.
     *
     * @return the queue
     */
    @Bean("directQueue")
    public Queue directQueue() {
        return QueueBuilder.durable("DIRECT_QUEUE").build();
    }

    /**
     * 通过绑定键 将指定队列绑定到一个指定的交换机 .
     *
     * @param queue    the queue
     * @param exchange the exchange
     * @return the binding
     */
    @Bean
    public Binding directBinding(@Qualifier("directQueue") Queue queue,
                       @Qualifier("directExchange") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("DIRECT_ROUTING_KEY").noargs();
    }


生产者

    // 发送消息
    rabbitTemplate.setExchange(exchange);
    rabbitTemplate.setRoutingKey(routeKey);
    rabbitTemplate.convertAndSend(message);
    
    
    // 发送消息
    rabbitTemplate.convertAndSend(exchange, routeKey, message);
    

    // 发送消息:为消息设置关联数据???
    CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    rabbitTemplate.convertAndSend(exchange, routeKey, message, correlationData);


    // 发送消息:为消息设置过期时间
    amqpTemplate.convertAndSend(exchange, routeKey, message, 
            new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    //给消息设置延迟毫秒值
                    message.getMessageProperties().setExpiration(String.valueOf(delayTimes));
                    return message;
                }
            });


消费者

    // 基础注解,指定queue的名称,可以多个。除 simple/Work 模式外,都需要配置类来配置queue、exchange及他绑定关系
    @RabbitListener(queues = "queue")
    @RabbitHandler
    public void processSimpleMsg(String message) {
        System.out.println("########################received simple" + message);
    }


   // 如果不想使用配置类,可以直接注解通过 bindings,绑定,spring 会根据注解生成绑定
   // ps:如果已有同名称的类。不会覆盖。会影响功能
    @RabbitListener(bindings = { @QueueBinding( value = @Queue(value = "queue", durable = "true"),
                                                exchange = @Exchange(value = "exchange", type = "direct"), 
                                                key = {"routeKey1","routeKey2"}) 
                               })
    @RabbitHandler
    public void processDirectMsg(String message) {
        System.out.println("########################received" + message);
    }

常用注解说明

@Exchange

@Exchange 是声明交换及交换机的一些属性:

@Target({})
@Retention(RetentionPolicy.RUNTIME)
public @interface Exchange {

    String TRUE = "true";

    String FALSE = "false";

    /**
     * @return the exchange name.
     */
    @AliasFor("name")
    String value() default "";

    /**
     * @return the exchange name.
     * @since 2.0
     */
    @AliasFor("value")
    String name() default "";

    /**
     * 交换机类型,默认DIRECT
     */
    String type() default ExchangeTypes.DIRECT;

    /**
     * 是否持久化
     */
    String durable() default TRUE;

    /**
     * 是否自动删除
     */
    String autoDelete() default FALSE;

    /**
     * @return the arguments to apply when declaring this exchange.
     * @since 1.6
     */
    Argument[] arguments() default {};
}

@Queue

@Queue 是声明队列及队列的一些属性,主要参数如下:

@Target({})
@Retention(RetentionPolicy.RUNTIME)
public @interface Queue {
    /**
     * @return the queue name or "" for a generated queue name (default).
     */
    @AliasFor("name")
    String value() default "";

    /**
     * @return the queue name or "" for a generated queue name (default).
     * @since 2.0
     */
    @AliasFor("value")
    String name() default "";

    /**
     * 是否持久化
     */
    String durable() default "";

    /**
     * 是否独享、排外的.
     */
    String exclusive() default "";

    /**
     * 是否自动删除;
     */
    String autoDelete() default "";

    /**
     * 队列的其他属性参数
     *(1)x-message-ttl:消息的过期时间,单位:毫秒;
     *(2)x-expires:队列过期时间,队列在多长时间未被访问将被删除,单位:毫秒;
     *(3)x-max-length:队列最大长度,超过该最大值,则将从队列头部开始删除消息;
     *(4)x-max-length-bytes:队列消息内容占用最大空间,受限于内存大小,超过该阈值则从队列头部开始删除消息;
     *(5)x-overflow:设置队列溢出行为。这决定了当达到队列的最大长度时消息会发生什么。有效值是drop-head、reject-publish或reject-publish-dlx。仲裁队列类型仅支持drop-head;
     *(6)x-dead-letter-exchange:死信交换器名称,过期或被删除(因队列长度超长或因空间超出阈值)的消息可指定发送到该交换器中;
     *(7)x-dead-letter-routing-key:死信消息路由键,在消息发送到死信交换器时会使用该路由键,如果不设置,则使用消息的原来的路由键值
     *(8)x-single-active-consumer:表示队列是否是单一活动消费者,true时,注册的消费组内只有一个消费者消费消息,其他被忽略,false时消息循环分发给所有消费者(默认false)
     *(9)x-max-priority:队列要支持的最大优先级数;如果未设置,队列将不支持消息优先级;
     *(10)x-queue-mode(Lazy mode):将队列设置为延迟模式,在磁盘上保留尽可能多的消息,以减少RAM的使用;如果未设置,队列将保留内存缓存以尽可能快地传递消息;
     *(11)x-queue-master-locator:在集群模式下设置镜像队列的主节点信息。
     */
    Argument[] arguments() default {};
}

@QueueBinding

@QueueBinding作用就是将队列和交换机进行绑定,主要有以下三个参数:

@Target({})
@Retention(RetentionPolicy.RUNTIME)
public @interface QueueBinding {
    /**
     * @return the queue.
     */
    Queue value();

    /**
     * @return the exchange.
     */
    Exchange exchange();

    /**
     * @return the routing key or pattern for the binding.
     * Multiple elements will result in multiple bindings.
     */
    String[] key() default {};
}

使用自定义消息类型

前面我们发送的消息是一个字符串类型,实际业务中我们更愿意直接发送各种自定义Java对象类型的数据。

定义一个实体对象

package com.tizi365.rabbitmq.domain;

import java.io.Serializable;
import lombok.Data;

// 博客内容
@Data
public class Blog implements Serializable {
    // id
    private Integer id;
    // 标题
    private String title;
}

发送自定义类型消息

Blog blog = new Blog();
blog.setId(100);
blog.setTitle("Tizi365 RabbitMQ教程");

// 发送消息
template.convertAndSend(helloQueue.getName(), blog);

接收自定义类型消息

@RabbitHandler
// 方法参数改为自定义消息类型即可
public void receive(Blog msg) {
    System.out.println("消费者 - 收到消息 '" + msg.getTitle() + "'");
}

使用Json序列化消息内容

RabbitMQ 发送Java实体对象数据的时候,默认使用JDK的对象序列化工具。我们可以改成使用json格式对数据进行序列化,这样可以支持其他类型的语言消费Java发送出去的消息,同时也让消息格式更具有可读性。


修改以前的配置类,增加下面配置, 使用Jackson json解析器对消息数据进行序列化和反序列化。

@Bean
public Jackson2JsonMessageConverter messageConverter() {
   // 设置默认消息转换器
   return new Jackson2JsonMessageConverter();
}

集成示例

  1. maven
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.9.6</version>
        </dependency>
    
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>com.vaadin.external.google</groupId>
                    <artifactId>android-json</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>
    
  2. application.yml
    spring:
      rabbitmq:
        host: 127.0.0.1
        port: 5672
        username: spring
        password: 123456
        publisher-confirms: true #支持发布确认
        publisher-returns: true  #支持发布返回
        listener:
          simple:
            acknowledge-mode: manual #采用手动应答
            concurrency: 1 #指定最小的消费者数量
            max-concurrency: 1 #指定最大的消费者数量
            retry:
              enabled: true #是否支持重试
    
  3. 配置类
    • 定制模版类、声明交换机、队列、绑定交换机到队列;
    @Configuration
    public class RabbitConfig {
        @Resource
        private RabbitTemplate rabbitTemplate;
    
        /**
         * 定制化amqp模版      可根据需要定制多个
         * <p>
         * <p>
         * 此处为模版类定义 Jackson 消息转换器
         * ConfirmCallback 接口用于实现消息发送到 RabbitMQ 交换器后接收 ack 回调   即消息发送到exchange  ack
         * ReturnCallback 接口用于实现消息发送到 RabbitMQ 交换器,但无相应队列与交换器绑定时的回调  即消息发送不到任何一个队列中  ack
         *
         * @return the amqp template
         */
        // @Primary
        @Bean
        public AmqpTemplate amqpTemplate() {
            Logger log = LoggerFactory.getLogger(RabbitTemplate.class);
            
            // 使用jackson 消息转换器
            rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
            rabbitTemplate.setEncoding("UTF-8");
            
            // 消息发送失败返回到队列中,yml需要配置 publisher-returns: true
            rabbitTemplate.setMandatory(true);
            rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
                String correlationId = message.getMessageProperties().getCorrelationIdString();
                log.debug("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {}  路由键: {}", correlationId, replyCode, replyText, exchange, routingKey);
            });
            
            // 消息确认,yml需要配置 publisher-confirms: true
            rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
                if (ack) {
                    log.debug("消息发送到exchange成功,id: {}", correlationData.getId());
                } else {
                    log.debug("消息发送到exchange失败,原因: {}", cause);
                }
            });
            return rabbitTemplate;
        }
    
        /* ----------------------------------------------------------------------------Direct exchange test--------------------------------------------------------------------------- */
    
        /**
         * 声明Direct交换机 支持持久化.
         *
         * @return the exchange
         */
        @Bean("directExchange")
        public Exchange directExchange() {
            return ExchangeBuilder.directExchange("DIRECT_EXCHANGE").durable(true).build();
        }
    
        /**
         * 声明一个队列 支持持久化.
         *
         * @return the queue
         */
        @Bean("directQueue")
        public Queue directQueue() {
            return QueueBuilder.durable("DIRECT_QUEUE").build();
        }
    
        /**
         * 通过绑定键 将指定队列绑定到一个指定的交换机 .
         *
         * @param queue    the queue
         * @param exchange the exchange
         * @return the binding
         */
        @Bean
        public Binding directBinding(@Qualifier("directQueue") Queue queue,
                                     @Qualifier("directExchange") Exchange exchange) {
            return BindingBuilder.bind(queue).to(exchange).with("DIRECT_ROUTING_KEY").noargs();
        }
    
        /* ----------------------------------------------------------------------------Fanout exchange test--------------------------------------------------------------------------- */
    
        /**
         * 声明 fanout 交换机.
         *
         * @return the exchange
         */
        @Bean("fanoutExchange")
        public FanoutExchange fanoutExchange() {
            return (FanoutExchange) ExchangeBuilder.fanoutExchange("FANOUT_EXCHANGE").durable(true).build();
        }
    
        /**
         * Fanout queue A.
         *
         * @return the queue
         */
        @Bean("fanoutQueueA")
        public Queue fanoutQueueA() {
            return QueueBuilder.durable("FANOUT_QUEUE_A").build();
        }
    
        /**
         * Fanout queue B .
         *
         * @return the queue
         */
        @Bean("fanoutQueueB")
        public Queue fanoutQueueB() {
            return QueueBuilder.durable("FANOUT_QUEUE_B").build();
        }
    
        /**
         * 绑定队列A 到Fanout 交换机.
         *
         * @param queue          the queue
         * @param fanoutExchange the fanout exchange
         * @return the binding
         */
        @Bean
        public Binding bindingA(@Qualifier("fanoutQueueA") Queue queue,
                                @Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(queue).to(fanoutExchange);
        }
    
        /**
         * 绑定队列B 到Fanout 交换机.
         *
         * @param queue          the queue
         * @param fanoutExchange the fanout exchange
         * @return the binding
         */
        @Bean
        public Binding bindingB(@Qualifier("fanoutQueueB") Queue queue,
                                @Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(queue).to(fanoutExchange);
        }
    }
    
  4. 生产者
    @Service
    public class SenderService {
        private Logger logger = LoggerFactory.getLogger(this.getClass());
        @Resource
        private RabbitTemplate rabbitTemplate;
    
        /**
         * 测试广播模式.
         *
         * @param p the p
         * @return the response entity
         */
        public void broadcast(String p) {
            CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
            rabbitTemplate.convertAndSend("FANOUT_EXCHANGE", "", p, correlationData);
        }
    
        /**
         * 测试Direct模式.
         *
         * @param p the p
         * @return the response entity
         */
        public void direct(String p) {
            CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
            rabbitTemplate.convertAndSend("DIRECT_EXCHANGE", "DIRECT_ROUTING_KEY", p, correlationData);
        }
    }
    
  5. 消费者
    @Component
    public class Receiver {
        private static final Logger log = LoggerFactory.getLogger(Receiver.class);
    
        /**
         * FANOUT广播队列监听一.
         *
         * @param message the message
         * @param channel the channel
         * @throws IOException the io exception  这里异常需要处理
         */
        @RabbitListener(queues = {"FANOUT_QUEUE_A"})
        public void on(Message message, Channel channel) throws IOException {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
            log.debug("FANOUT_QUEUE_A " + new String(message.getBody()));
        }
    
        /**
         * FANOUT广播队列监听二.
         *
         * @param message the message
         * @param channel the channel
         * @throws IOException the io exception   这里异常需要处理
         */
        @RabbitListener(queues = {"FANOUT_QUEUE_B"})
        public void t(Message message, Channel channel) throws IOException {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
            log.debug("FANOUT_QUEUE_B " + new String(message.getBody()));
        }
    
        /**
         * DIRECT模式.
         *
         * @param message the message
         * @param channel the channel
         * @throws IOException the io exception  这里异常需要处理
         */
        @RabbitListener(queues = {"DIRECT_QUEUE"})
        public void message(Message message, Channel channel) throws IOException {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
            log.debug("DIRECT " + new String(message.getBody()));
        }
    }