查看“RabbitMQ:SpringBoot集成”的源代码
←
RabbitMQ:SpringBoot集成
跳到导航
跳到搜索
因为以下原因,您没有权限编辑本页:
您请求的操作仅限属于该用户组的用户执行:
用户
您可以查看和复制此页面的源代码。
[[category:RabbitMQ]] == 关于 == 目前 Java 操作 RabbitMQ 主要使用 Springboot 的 '''spring-boot-starter-amqp''' 包, 其实就是使用 '''Spring AMQP''' 操作队列。 * 无论使用 RabbitMQ 那种工作模式,区别就是使用的交换机(Exchange)类型和路由参数不一样。 ** 简单队列、Work模式,底层会使用默认的交换机(Direct交换机); === Maven依赖 === <syntaxhighlight lang="xml" highlight=""> <dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-amqp</artifactid> </dependency> </syntaxhighlight> === 配置RabbitMQ === 修改'''application.yml'''配置: <syntaxhighlight lang="xml" highlight=""> spring: rabbitmq: # rabbitMQ服务器地址 host: localhost port: 5672 username: guest password: guest </syntaxhighlight> == 快速入门(简单队列) == Java RabbitMQ最简单的队列模式就是一个生产者和一个消费者。 : [[File:RabbitMQ工作模式:简单队列.png|400px]] === 配置队列 === 通过 springboot 的 '''configuration 类'''配置队列: <syntaxhighlight lang="java" highlight="7,9,12"> package com.tizi365.rabbitmq.config; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class QueueConfig { @Bean public Queue helloQueue() { // 声明队列, 队列名需要唯一 return new Queue("hello"); } } </syntaxhighlight> * 可以根据业务需要定义多个队列,Queue name(队列名)和 Bean id(此处即方法名)不一样即可。 *: 如上,Bean id:“helloQueue”;Queue name:“hello”; === 发送消息 === 发送消息需要用到 '''RabbitTemplate 类''',springboot已经帮我们初始化,注入实例即可: <syntaxhighlight lang="Java" highlight="10,14,18,19,30"> package com.tizi365.rabbitmq.service; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; @Service public class SendService { // 注入RabbitTemplate实例 @Autowired private RabbitTemplate template; // 注入前面定义的队列 @Autowired @Qualifier("helloQueue") private Queue helloQueue; // 为了演示,这里使用spring内置的定时任务,定时发送消息(每秒发送一条消息) @Scheduled(fixedDelay = 1000, initialDelay = 1000) public void send() { // 消息内容 String message = "Hello World!"; // 发送消息 // 第一个参数是路由参数,这里使用队列名作为路由参数 // 第二个参数是消息内容,支持任意类型,只要支持序列化 template.convertAndSend(helloQueue.getName(), message); System.out.println("发送消息 '" + message + "'"); } } </syntaxhighlight> * 这里没有直接使用交换机(exchange),底层会使用默认的交换机('''Direct交换机''')。 === 接收消息 === 接收消息需要用 '''@RabbitListener(queues = "xxx") 注解'''指定接受哪个 Queue 的消息: * “@RabbitListener”注解可以作用在类上,也可以作用在方法上; ** 如果其定义在类上,则需要配合“'''@RabbitHandler'''”注解标记由那个类方法执行消息处理。 <syntaxhighlight lang="Java" highlight="9,12"> package com.tizi365.rabbitmq.listener; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component // 声明消息监听器,通过queues参数指定监听的队列,需要跟前面的队列名保持一致 @RabbitListener(queues = "hello") public class HelloListener { // 使用RabbitHandler标记消息处理器,用来执行消息处理逻辑 @RabbitHandler public void receive(String msg) { System.out.println("消费者 - 收到消息 '" + msg + "'"); } } </syntaxhighlight> == Work模式 == Java RabbitMQ Work模式,就是配置多个消费者消费一个队列的消息,可以提高消息的并发处理速度: : [[File:RabbitMQ工作模式:工作队列(Work模式).png|400px]] === 接收消息(配置多个消费者) === 通过'''@RabbitListener'''注解,配置'''concurrency'''参数即可实现。 如下,启动10个消费者并发处理消息: <syntaxhighlight lang="Java" highlight="10"> package com.tizi365.rabbitmq.listener; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component // 声明消息监听器,通过queues参数指定监听的队列 // 关键参数:concurrency 代表当前监听器需要启动多少个消费者,类型是字符串 @RabbitListener(queues = "hello", concurrency = "10") public class HelloListener { // 使用RabbitHandler标记消息处理器,用来执行消息处理逻辑 @RabbitHandler public void receive(String msg) { System.out.println("消费者 - 收到消息 '" + msg + "'"); } } </syntaxhighlight> == 发布订阅模式 == Java RabbitMQ发布订阅模式(广播模式、fanout模式),使用的交换机类型为'''FanoutExchange''',就是一个生产者发送的消息会被多个队列的消费者处理: : [[File:RabbitMQ工作模式:发布订阅模式(广播模式、fanout模式).png|400px]] === 配置:交换机、队列 === 通过 Springboot 配置类: # 定义交换机(FanoutExchange); # 定义队列(Queue); # 将队列绑定到目标交换机上(Binding); <syntaxhighlight lang="Java" highlight="11,17,23,29,35"> package com.tizi365.rabbitmq.config; import org.springframework.amqp.core.FanoutExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class QueueConfig { // 定义交换机 @Bean public FanoutExchange fanout() { // 参数为交换机名字,不能重复 return new FanoutExchange("tizi365.fanout"); } @Bean public Queue queue1() { // 定义队列1 return new Queue("tizi365.fanout.queue1"); } @Bean public Queue queue2() { // 定义队列2 return new Queue("tizi365.fanout.queue2"); } @Bean public Binding binding1(FanoutExchange fanout, Queue queue1) { // 定义一个绑定关系,将队列1绑定到fanout交换机上 return BindingBuilder.bind(queue1).to(fanout); } @Bean public Binding binding2(FanoutExchange fanout, Queue queue2) { // 定义一个绑定关系,将队列2绑定到fanout交换机上 return BindingBuilder.bind(queue2).to(fanout); } } </syntaxhighlight> === 发送消息 === 将消息发送给交换机,由交换机根据路由规则投递消息到对应的队列: <syntaxhighlight lang="Java" highlight="14,25"> package com.tizi365.rabbitmq.service; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; @Service public class SendService { @Autowired private RabbitTemplate template; @Autowired private FanoutExchange fanout; // 为演示,这里使用定时任务,每秒发送一条消息 @Scheduled(fixedDelay = 1000, initialDelay = 1000) public void send() { // 消息内容 String message = "Hello World!"; // 发送消息 // 第一个参数是交换机名字 // 第二个参数是路由参数,fanout交换机会忽略路由参数,所以不用设置 // 第三个参数是消息内容,支持任意类型,只要支持序列化 template.convertAndSend(fanout.getName(), "", message); System.out.println("发送消息 '" + message + "'"); } } </syntaxhighlight> === 接收消息 === 通过'''@RabbitListener'''注解定义消息监听器,消费指定队列的消息: <syntaxhighlight lang="Java" highlight="10,16"> package com.tizi365.rabbitmq.listener; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; // 将当前类交给Spring管理 @Component public class DemoListener { // 定义一个监听器,通过queues参数指定监听那个队列 @RabbitListener(queues = "tizi365.fanout.queue1") public void receive1(String msg) { System.out.println("收到队列1的消息 = " + msg); } // 定义一个监听器,通过queues参数指定监听那个队列 @RabbitListener(queues = "tizi365.fanout.queue2") public void receive2(String msg) { System.out.println("收到队列2的消息 = " + msg); } } </syntaxhighlight> * 每一条消息,都会分发给所有绑定到当前交换机的队列中,消息会被上面的两个方法分别处理。 ==== P.S. :全注解方式定义队列监听器 ==== 直接通过 @RabbitListener 注解的 bindings 参数定义绑定关系、队列、交换机: * 不需要前面的 springboot 配置类定义:交换机、队列和绑定关系; <syntaxhighlight lang="Java" highlight=""> @RabbitListener( bindings = { @QueueBinding( value = @Queue(name = "tizi365.fanout.queue3", durable = "true"), exchange = @Exchange(name = "tizi365.fanout", durable = "true",type = ExchangeTypes.FANOUT) ) } ) public void receive3(String msg) { System.out.println("收到队列3的消息 = " + msg); } </syntaxhighlight> 说明: * QueueBinding 注解:定义队列和交换机的绑定关系:“value”参数用于定义队列,“exchange”用于定义交换机; * Queue 注解:定义一个队列:“name”参数定义队列名(需要唯一),“durable”参数表示是否需要持久化; * Exchange 注解:定义一个交换机:“name”参数定义交换机的名字,“type”参数表示交换机的类型; == 路由模式 == === 配置:交换机、队列 === 通过 Springboot 配置类: # 定义交换机(DirectExchange); # 定义队列(Queue); # 将队列绑定到目标交换机上(Binding); <syntaxhighlight lang="Java" highlight=""> </syntaxhighlight> <syntaxhighlight lang="Java" highlight=""> </syntaxhighlight> <syntaxhighlight lang="Java" highlight=""> </syntaxhighlight> <syntaxhighlight lang="Java" highlight=""> </syntaxhighlight> <syntaxhighlight lang="Java" highlight=""> </syntaxhighlight> <syntaxhighlight lang="Java" highlight=""> </syntaxhighlight> <syntaxhighlight lang="Java" highlight=""> </syntaxhighlight> <syntaxhighlight lang="Java" highlight=""> </syntaxhighlight> <syntaxhighlight lang="Java" highlight=""> </syntaxhighlight> <syntaxhighlight lang="Java" highlight=""> </syntaxhighlight> <syntaxhighlight lang="Java" highlight=""> </syntaxhighlight> <syntaxhighlight lang="Java" highlight=""> </syntaxhighlight> == 使用自定义消息类型 == 前面我们发送的消息是一个字符串类型,实际业务中我们更愿意直接发送各种自定义'''Java对象类型'''的数据。 === 定义一个实体对象 === <syntaxhighlight lang="Java" highlight=""> package com.tizi365.rabbitmq.domain; import java.io.Serializable; import lombok.Data; // 博客内容 @Data public class Blog implements Serializable { // id private Integer id; // 标题 private String title; } </syntaxhighlight> === 发送自定义类型消息 === <syntaxhighlight lang="Java" highlight=""> Blog blog = new Blog(); blog.setId(100); blog.setTitle("Tizi365 RabbitMQ教程"); // 发送消息 template.convertAndSend(helloQueue.getName(), blog); </syntaxhighlight> === 接收自定义类型消息 === <syntaxhighlight lang="Java" highlight=""> @RabbitHandler // 方法参数改为自定义消息类型即可 public void receive(Blog msg) { System.out.println("消费者 - 收到消息 '" + msg.getTitle() + "'"); } </syntaxhighlight> === 使用Json序列化消息内容 === RabbitMQ 发送Java实体对象数据的时候,默认使用'''JDK的对象序列化工具'''。我们可以改成使用json格式对数据进行序列化,这样可以支持其他类型的语言消费Java发送出去的消息,同时也让消息格式更具有可读性。 修改以前的配置类,增加下面配置, 使用'''Jackson json解析器'''对消息数据进行序列化和反序列化。 <syntaxhighlight lang="Java" highlight=""> @Bean public Jackson2JsonMessageConverter messageConverter() { // 设置默认消息转换器 return new Jackson2JsonMessageConverter(); } </syntaxhighlight>
返回至“
RabbitMQ:SpringBoot集成
”。
导航菜单
个人工具
登录
命名空间
页面
讨论
大陆简体
已展开
已折叠
查看
阅读
查看源代码
查看历史
更多
已展开
已折叠
搜索
导航
首页
最近更改
随机页面
MediaWiki帮助
笔记
服务器
数据库
后端
前端
工具
《To do list》
日常
阅读
电影
摄影
其他
Software
Windows
WIKIOE
所有分类
所有页面
侧边栏
站点日志
工具
链入页面
相关更改
特殊页面
页面信息