查看“RabbitMQ:SpringBoot集成”的源代码
←
RabbitMQ:SpringBoot集成
跳到导航
跳到搜索
因为以下原因,您没有权限编辑本页:
您请求的操作仅限属于该用户组的用户执行:
用户
您可以查看和复制此页面的源代码。
[[category:RabbitMQ]] [[category:SpringBoot]] == SpringBoot 集成 RabbitMQ == === 依赖 === <syntaxhighlight lang="xml" highlight=""> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </syntaxhighlight> === RabbitMQ 配置 === ('''application.yml'''中) 简单配置: <syntaxhighlight lang="yaml" highlight=""> spring: rabbitmq: host: 127.0.0.1 #ip port: 5672 #端口 username: guest #账号 password: guest #密码 </syntaxhighlight> P.S.:'''全量配置说明''' <syntaxhighlight lang="yaml" highlight=""> spring: rabbitmq: host: 127.0.0.1 #ip port: 5672 #端口 username: guest #账号 password: guest #密码 virtualHost: #链接的虚拟主机 addresses: 127.0.0.1:5672 #多个以逗号分隔,与host功能一样。 requestedHeartbeat: 60 #指定心跳超时,单位秒,0为不指定;默认60s publisherConfirms: true #发布确认机制是否启用 publisherReturns: #发布返回是否启用 connectionTimeout: #链接超时。单位ms。0表示无穷大不超时 ### ssl相关 ssl: enabled: #是否支持ssl keyStore: #指定持有SSL certificate的key store的路径 keyStoreType: #key store类型 默认PKCS12 keyStorePassword: #指定访问key store的密码 trustStore: #指定持有SSL certificates的Trust store trustStoreType: #默认JKS trustStorePassword: #访问密码 algorithm: #ssl使用的算法,例如,TLSv1.1 verifyHostname: #是否开启hostname验证 ### cache相关 cache: channel: size: #缓存中保持的channel数量 checkoutTimeout: #当缓存数量被设置时,从缓存中获取一个channel的超时时间,单位毫秒;如果为0,则总是创建一个新channel connection: mode: #连接工厂缓存模式:CHANNEL 和 CONNECTION size: #缓存的连接数,只有是CONNECTION模式时生效 ### listener listener: type: #两种类型,SIMPLE,DIRECT ## simple类型 simple: concurrency: #最小消费者数量 maxConcurrency: #最大的消费者数量 transactionSize: #指定一个事务处理的消息数量,最好是小于等于prefetch的数量 missingQueuesFatal: #是否停止容器当容器中的队列不可用 ## 与direct相同配置部分 autoStartup: #是否自动启动容器 acknowledgeMode: #表示消息确认方式,其有三种配置方式,分别是none、manual和auto;默认auto prefetch: #指定一个请求能处理多少个消息,如果有事务的话,必须大于等于transaction数量 defaultRequeueRejected: #决定被拒绝的消息是否重新入队;默认是true(与参数acknowledge-mode有关系) idleEventInterval: #container events发布频率,单位ms ##重试机制 retry: stateless: #有无状态 enabled: #是否开启 maxAttempts: #最大重试次数,默认3 initialInterval: #重试间隔 multiplier: #对于上一次重试的乘数 maxInterval: #最大重试时间间隔 direct: consumersPerQueue: #每个队列消费者数量 missingQueuesFatal: #...其余配置看上方公共配置 ## template相关 template: mandatory: #是否启用强制信息;默认false receiveTimeout: #`receive()`接收方法超时时间 replyTimeout: #`sendAndReceive()`超时时间 exchange: #默认的交换机 routingKey: #默认的路由 defaultReceiveQueue: #默认的接收队列 ## retry重试相关 retry: enabled: #是否开启 maxAttempts: #最大重试次数 initialInterval: #重试间隔 multiplier: #失败间隔乘数 maxInterval: #最大间隔 </syntaxhighlight> === 关键代码 === '''配置类''': <syntaxhighlight lang="Java" highlight=""> @Bean public AmqpTemplate amqpTemplate() { Logger log = LoggerFactory.getLogger(RabbitTemplate.class); // 使用jackson 消息转换器 rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); rabbitTemplate.setEncoding("UTF-8"); // 消息发送失败返回到队列中,yml需要配置 publisher-returns: true rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { String correlationId = message.getMessageProperties().getCorrelationIdString(); log.debug("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {} 路由键: {}", correlationId, replyCode, replyText, exchange, routingKey); }); // 消息确认,yml需要配置 publisher-confirms: true rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { log.debug("消息发送到exchange成功,id: {}", correlationData.getId()); } else { log.debug("消息发送到exchange失败,原因: {}", cause); } }); return rabbitTemplate; } /** * 声明Direct交换机 支持持久化. * * @return the exchange */ @Bean("directExchange") public Exchange directExchange() { return ExchangeBuilder.directExchange("DIRECT_EXCHANGE").durable(true).build(); } /** * 声明一个队列 支持持久化. * * @return the queue */ @Bean("directQueue") public Queue directQueue() { return QueueBuilder.durable("DIRECT_QUEUE").build(); } /** * 通过绑定键 将指定队列绑定到一个指定的交换机 . * * @param queue the queue * @param exchange the exchange * @return the binding */ @Bean public Binding directBinding(@Qualifier("directQueue") Queue queue, @Qualifier("directExchange") Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("DIRECT_ROUTING_KEY").noargs(); } </syntaxhighlight> '''生产者''': <syntaxhighlight lang="Java" highlight=""> // 发送消息 rabbitTemplate.setExchange(exchange); rabbitTemplate.setRoutingKey(routeKey); rabbitTemplate.convertAndSend(message); // 发送消息 rabbitTemplate.convertAndSend(exchange, routeKey, message); // 发送消息:为消息设置关联数据??? CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend(exchange, routeKey, message, correlationData); // 发送消息:为消息设置过期时间 amqpTemplate.convertAndSend(exchange, routeKey, message, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { //给消息设置延迟毫秒值 message.getMessageProperties().setExpiration(String.valueOf(delayTimes)); return message; } }); </syntaxhighlight> '''消费者''': <syntaxhighlight lang="java" highlight=""> // 基础注解,指定queue的名称,可以多个。除 simple/Work 模式外,都需要配置类来配置queue、exchange及他绑定关系 @RabbitListener(queues = "queue") @RabbitHandler public void processSimpleMsg(String message) { System.out.println("########################received simple" + message); } // 如果不想使用配置类,可以直接注解通过 bindings,绑定,spring 会根据注解生成绑定 // ps:如果已有同名称的类。不会覆盖。会影响功能 @RabbitListener(bindings = { @QueueBinding( value = @Queue(value = "queue", durable = "true"), exchange = @Exchange(value = "exchange", type = "direct"), key = {"routeKey1","routeKey2"}) }) @RabbitHandler public void processDirectMsg(String message) { System.out.println("########################received" + message); } </syntaxhighlight> == 集成示例 == # '''maven''': #: <syntaxhighlight lang="xml" highlight=""> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.9.6</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <exclusions> <exclusion> <groupId>com.vaadin.external.google</groupId> <artifactId>android-json</artifactId> </exclusion> </exclusions> </dependency> </dependencies> </syntaxhighlight> # '''application.yml''': #: <syntaxhighlight lang="yaml" highlight=""> spring: rabbitmq: host: 127.0.0.1 port: 5672 username: spring password: 123456 publisher-confirms: true #支持发布确认 publisher-returns: true #支持发布返回 listener: simple: acknowledge-mode: manual #采用手动应答 concurrency: 1 #指定最小的消费者数量 max-concurrency: 1 #指定最大的消费者数量 retry: enabled: true #是否支持重试 </syntaxhighlight> # '''配置类''': #* 定制模版类、声明交换机、队列、绑定交换机到队列; #: <syntaxhighlight lang="Java" highlight=""> @Configuration public class RabbitConfig { @Resource private RabbitTemplate rabbitTemplate; /** * 定制化amqp模版 可根据需要定制多个 * <p> * <p> * 此处为模版类定义 Jackson 消息转换器 * ConfirmCallback 接口用于实现消息发送到 RabbitMQ 交换器后接收 ack 回调 即消息发送到exchange ack * ReturnCallback 接口用于实现消息发送到 RabbitMQ 交换器,但无相应队列与交换器绑定时的回调 即消息发送不到任何一个队列中 ack * * @return the amqp template */ // @Primary @Bean public AmqpTemplate amqpTemplate() { Logger log = LoggerFactory.getLogger(RabbitTemplate.class); // 使用jackson 消息转换器 rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); rabbitTemplate.setEncoding("UTF-8"); // 消息发送失败返回到队列中,yml需要配置 publisher-returns: true rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { String correlationId = message.getMessageProperties().getCorrelationIdString(); log.debug("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {} 路由键: {}", correlationId, replyCode, replyText, exchange, routingKey); }); // 消息确认,yml需要配置 publisher-confirms: true rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { log.debug("消息发送到exchange成功,id: {}", correlationData.getId()); } else { log.debug("消息发送到exchange失败,原因: {}", cause); } }); return rabbitTemplate; } /* ----------------------------------------------------------------------------Direct exchange test--------------------------------------------------------------------------- */ /** * 声明Direct交换机 支持持久化. * * @return the exchange */ @Bean("directExchange") public Exchange directExchange() { return ExchangeBuilder.directExchange("DIRECT_EXCHANGE").durable(true).build(); } /** * 声明一个队列 支持持久化. * * @return the queue */ @Bean("directQueue") public Queue directQueue() { return QueueBuilder.durable("DIRECT_QUEUE").build(); } /** * 通过绑定键 将指定队列绑定到一个指定的交换机 . * * @param queue the queue * @param exchange the exchange * @return the binding */ @Bean public Binding directBinding(@Qualifier("directQueue") Queue queue, @Qualifier("directExchange") Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("DIRECT_ROUTING_KEY").noargs(); } /* ----------------------------------------------------------------------------Fanout exchange test--------------------------------------------------------------------------- */ /** * 声明 fanout 交换机. * * @return the exchange */ @Bean("fanoutExchange") public FanoutExchange fanoutExchange() { return (FanoutExchange) ExchangeBuilder.fanoutExchange("FANOUT_EXCHANGE").durable(true).build(); } /** * Fanout queue A. * * @return the queue */ @Bean("fanoutQueueA") public Queue fanoutQueueA() { return QueueBuilder.durable("FANOUT_QUEUE_A").build(); } /** * Fanout queue B . * * @return the queue */ @Bean("fanoutQueueB") public Queue fanoutQueueB() { return QueueBuilder.durable("FANOUT_QUEUE_B").build(); } /** * 绑定队列A 到Fanout 交换机. * * @param queue the queue * @param fanoutExchange the fanout exchange * @return the binding */ @Bean public Binding bindingA(@Qualifier("fanoutQueueA") Queue queue, @Qualifier("fanoutExchange") FanoutExchange fanoutExchange) { return BindingBuilder.bind(queue).to(fanoutExchange); } /** * 绑定队列B 到Fanout 交换机. * * @param queue the queue * @param fanoutExchange the fanout exchange * @return the binding */ @Bean public Binding bindingB(@Qualifier("fanoutQueueB") Queue queue, @Qualifier("fanoutExchange") FanoutExchange fanoutExchange) { return BindingBuilder.bind(queue).to(fanoutExchange); } } </syntaxhighlight> # '''生产者''': #: <syntaxhighlight lang="Java" highlight=""> @Service public class SenderService { private Logger logger = LoggerFactory.getLogger(this.getClass()); @Resource private RabbitTemplate rabbitTemplate; /** * 测试广播模式. * * @param p the p * @return the response entity */ public void broadcast(String p) { CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend("FANOUT_EXCHANGE", "", p, correlationData); } /** * 测试Direct模式. * * @param p the p * @return the response entity */ public void direct(String p) { CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend("DIRECT_EXCHANGE", "DIRECT_ROUTING_KEY", p, correlationData); } } </syntaxhighlight> # '''消费者''': #: <syntaxhighlight lang="Java" highlight=""> @Component public class Receiver { private static final Logger log = LoggerFactory.getLogger(Receiver.class); /** * FANOUT广播队列监听一. * * @param message the message * @param channel the channel * @throws IOException the io exception 这里异常需要处理 */ @RabbitListener(queues = {"FANOUT_QUEUE_A"}) public void on(Message message, Channel channel) throws IOException { channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); log.debug("FANOUT_QUEUE_A " + new String(message.getBody())); } /** * FANOUT广播队列监听二. * * @param message the message * @param channel the channel * @throws IOException the io exception 这里异常需要处理 */ @RabbitListener(queues = {"FANOUT_QUEUE_B"}) public void t(Message message, Channel channel) throws IOException { channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); log.debug("FANOUT_QUEUE_B " + new String(message.getBody())); } /** * DIRECT模式. * * @param message the message * @param channel the channel * @throws IOException the io exception 这里异常需要处理 */ @RabbitListener(queues = {"DIRECT_QUEUE"}) public void message(Message message, Channel channel) throws IOException { channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); log.debug("DIRECT " + new String(message.getBody())); } } </syntaxhighlight> === 常用注解说明 === == 使用自定义消息类型 == 前面我们发送的消息是一个字符串类型,实际业务中我们更愿意直接发送各种自定义'''Java对象类型'''的数据。 === 定义一个实体对象 === <syntaxhighlight lang="Java" highlight=""> package com.tizi365.rabbitmq.domain; import java.io.Serializable; import lombok.Data; // 博客内容 @Data public class Blog implements Serializable { // id private Integer id; // 标题 private String title; } </syntaxhighlight> === 发送自定义类型消息 === <syntaxhighlight lang="Java" highlight=""> Blog blog = new Blog(); blog.setId(100); blog.setTitle("Tizi365 RabbitMQ教程"); // 发送消息 template.convertAndSend(helloQueue.getName(), blog); </syntaxhighlight> === 接收自定义类型消息 === <syntaxhighlight lang="Java" highlight=""> @RabbitHandler // 方法参数改为自定义消息类型即可 public void receive(Blog msg) { System.out.println("消费者 - 收到消息 '" + msg.getTitle() + "'"); } </syntaxhighlight> === 使用Json序列化消息内容 === RabbitMQ 发送Java实体对象数据的时候,默认使用'''JDK的对象序列化工具'''。我们可以改成使用json格式对数据进行序列化,这样可以支持其他类型的语言消费Java发送出去的消息,同时也让消息格式更具有可读性。 修改以前的配置类,增加下面配置, 使用'''Jackson json解析器'''对消息数据进行序列化和反序列化。 <syntaxhighlight lang="Java" highlight=""> @Bean public Jackson2JsonMessageConverter messageConverter() { // 设置默认消息转换器 return new Jackson2JsonMessageConverter(); } </syntaxhighlight> == 延迟消息实现 == == 死信队列实现 ==
返回至“
RabbitMQ:SpringBoot集成
”。
导航菜单
个人工具
登录
命名空间
页面
讨论
大陆简体
已展开
已折叠
查看
阅读
查看源代码
查看历史
更多
已展开
已折叠
搜索
导航
首页
最近更改
随机页面
MediaWiki帮助
笔记
服务器
数据库
后端
前端
工具
《To do list》
日常
阅读
电影
摄影
其他
Software
Windows
WIKIOE
所有分类
所有页面
侧边栏
站点日志
工具
链入页面
相关更改
特殊页面
页面信息