查看“RabbitMQ:Java(工作模式)”的源代码
←
RabbitMQ:Java(工作模式)
跳到导航
跳到搜索
因为以下原因,您没有权限编辑本页:
您请求的操作仅限属于该用户组的用户执行:
用户
您可以查看和复制此页面的源代码。
[[category:RabbitMQ]] [[category:Java]] == 关于 == 代码示例来自官网:[https://www.rabbitmq.com/getstarted.html https://www.rabbitmq.com/getstarted.html] Maven 依赖: <syntaxhighlight lang="xml" highlight=""> <dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.6.0</version> </dependency> ... ... </dependencies> </syntaxhighlight> : [[File:RabbitMQ:Java示例.png|200px]] == 关键代码 == * '''connection''': *: <syntaxhighlight lang="Java" highlight=""> ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setUsername("guest"); factory.setPassword("guest"); factory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器 //创建与RabbitMQ服务的TCP连接 connection = factory.newConnection(); </syntaxhighlight> * '''channel''': *: <syntaxhighlight lang="Java" highlight=""> //创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务 channel = connection.createChannel(); </syntaxhighlight> ** channel 是必须的:生产者、消费者都通过 channel 与 exchange 交互的。 * '''exchange'''、'''queue'''、'''binding''': *: <syntaxhighlight lang="Java" highlight=""> /** 声明交换机, * 参数明细: * 1、String exchange:交换机名称 * 2、BuiltinExchangeType type:交换机类型 */ channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT); /** 声明队列 * 参数明细: * 1、String queue:队列名称 * 2、boolean durable:是否持久化 * 3、boolean exclusive:是否独占此队列 * 4、boolean autoDelete:队列不用是否自动删除 * 5、Map<String, Object> arguments:参数 */ channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null); channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null); /** 绑定 * 参数明细 * 1、String queue:队列名称 * 2、String exchange:交换机名称 * 3、String routingKey:路由key */ channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_EMAIL); channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_SMS); /** * 临时队列 */ String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_ROUTING_INFORM, ""); </syntaxhighlight> * '''消息发送''': *: <syntaxhighlight lang="Java" highlight=""> for (int i=0;i<10;i++){ String message = "sms inform to user"+i; /** 向交换机发送消息 * 参数明细 * 1、String exchange:交换机名称 * 2、String routingKey:路由key * 3、BasicProperties props:消息属性 * 4、byte[] body:消息内容 */ channel.basicPublish(EXCHANGE_ROUTING_INFORM, QUEUE_INFORM_SMS, null, message.getBytes()); System.out.println("Send Message is:'" + message + "'"); } </syntaxhighlight> ** “channel.basicPublish()”:简单队列、Work模式,不需要指定 exchange; * '''消息接收''': *: <syntaxhighlight lang="Java" highlight=""> /** * 定义消费方法 */ DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] 接收 '" + message + "'"); /** * try { * // 调用消息处理 * doWork(message); * } catch (InterruptedException e) { * e.getMessage(); * } finally { * System.out.println(" [x] 结束"); * // 手动确认消息 * channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); * } */ }; /** * 监听队列 * 参数明细 * 1、String queue:队列名称 * 2、boolean autoAck:是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动回复 * 3、Consumer callback:消费消息的方法,消费者接收到消息后调用此方法 * 4、 */ channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); </syntaxhighlight> *: <syntaxhighlight lang="Java" highlight=""> /** * 定义消费方法 */ DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { long deliveryTag = envelope.getDeliveryTag(); String exchange = envelope.getExchange(); // 消息内容 String message = new String(body, "utf-8"); System.out.println(message); } }; /** * 监听队列 * 参数明细 * 1、String queue:队列名称 * 2、boolean autoAck:是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动回复 * 3、Consumer callback:消费消息的方法,消费者接收到消息后调用此方法 */ channel.basicConsume(QUEUE_INFORM_EMAIL, true, defaultConsumer); </syntaxhighlight> ** 使用额外的 '''DeliverCallback''' 接口来缓冲服务器推送给我们的消息。 ** 可以手动或自动确认消息;(若不确认消息,MQ 就会认为这个消息没执行) === 代码总结 === 根据不同的工作模式,不一定需要声明 exchange、queue,或 binding: # '''简单队列''':直接与 queue 交互(使用无名 exchange:即,默认的 direct) #* 生产者:声明 queue; #* 消费者:声明 queue; #* 发送消息:指定 routingKey,而不指定 exchange; # '''Work模式''':(同上) # '''发布订阅''':消息发送到绑定的每一个队列; #* 生产者:声明 exchange; #* 消费者:声明 exchange,声明 queue('''临时队列'''),且 binding(不指定 routingKey); #* 发送消息:指定 exchange,而不指定 routingKey;(消息发送到每一个队列,所以不需要声明 queue) # '''路由模式''':根据 routingKey 相等,消息发送到绑定的队列; #* 生产者:声明 exchange; #* 消费者:声明 exchange,声明 queue('''临时队列'''),且 binding(指定 routingKey); #* 发送消息:指定 exchange,指定 routingKey; # '''主题模式''':根据 routingKey 匹配,消息发送到绑定的队列; #* 生产者:声明 exchange; #* 消费者:声明 exchange,声明 queue('''临时队列'''),且 binding(指定 routingKey 通配符); #* 发送消息:指定 exchange,指定 routingKey; == Hello World == # 生产者: #: <syntaxhighlight lang="Java" highlight=""> package top.codexu.rabbitmq.hello; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.nio.charset.StandardCharsets; public class Send { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8)); System.out.println(" [x] 发送 '" + message + "'"); } } } </syntaxhighlight> #* 可以使用 try-with-resource 语句,不必再显式地关闭资源;(Connection 和 Channel 都实现 java.io.Closeable) # 消费者: #: <syntaxhighlight lang="Java" highlight=""> package top.codexu.rabbitmq.hello; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; public class Recv { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] 等待消息."); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] 接收 '" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } } </syntaxhighlight> #* 在这里也声明了队列:因为我们可能会在发布服务器之前启动使用者,所以我们希望在尝试'''使用来自队列的消息之前确保该队列存在'''。 #* 不使用 try-with-resource 语句自动关闭通道和连接:因为我们希望在消费者异步侦听消息到达时,该过程保持活动状态。 == Work模式 == <pre> 工作队列是啥? 就是当遇到了运行耗时久的任务,并且还得等待它完成,这个时候就可以使用工作队列,把这个耗时任务发送给别的工人(消费者)进行处理,生产者可以直接得到处理完的情况。 </pre> 相关特性: # '''消息确认''':消费者发送通知告诉 MQ 处理完成了; # '''消息持久''':遇到了宕机后,会把消息给缓存或保存下来,使得下次启动能够不丢失(但不是百分百); # '''循环调度''':若开着两个或三个消费者的时候,当多个消息要接收,MQ 是会自动循环找下一个,避免一直重复同一个或几个; # '''公平派遣''':当有两个消费者的时候,若一个消费者一直再累死累活,另外一个逍遥自在,这是不利于效率提升的,故可以通过设置,限制若A忙就找B去。 # 生产者: #: <syntaxhighlight lang="Java" highlight=""> package top.codexu.rabbitmq.work; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; import java.nio.charset.StandardCharsets; public class NewTask { private final static String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { // 消息持久化(非百分百保证)。前提:队列第一次声明就是持久的,并且在生产者端和消费者端都要声明为true。 boolean durable = true; channel.queueDeclare(TASK_QUEUE_NAME, durable, false, false, null); for (int i = 0; i <= 5; i++) { String message = "第" + i + "个消息."; // MessageProperties.PERSISTENT_TEXT_PLAIN:将消息标记为持久性,确保即使 RabbitMQ 重新启动,task_queue 队列也不会丢失。 channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8)); System.out.println(" [x] 发送 '" + message + "'"); } } } } </syntaxhighlight> #* 将消息标记为持久性并'''不能完全保证消息不会丢失''': #*: 尽管它告诉 RabbitMQ 将消息保存到磁盘,但 RabbitMQ 接受消息但尚未保存消息的时间窗口仍然很短。而且,RabbitMQ 并不是对每条消息都执行“fsync(2)”:它可能只是保存到缓存中,而不是真正写入磁盘。 #*: 持久性保证不强,但对于简单任务队列来说已经足够了。如果你需要一个更强有力的保证,那么你可以使用“'''publisher confirms'''”。 # 消费者: #: <syntaxhighlight lang="Java" highlight=""> package top.codexu.rabbitmq.work; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; public class Worker { private final static String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); // 消息持久化(非百分百保证)。消费者端同发布者一样设置为 true boolean durable = true; channel.queueDeclare(TASK_QUEUE_NAME, durable, false, false, null); System.out.println(" [*] 等待消息."); // 公平派遣设置:一次仅接受一条未经确认的消息,为了实现公平的安排(其他人闲着就安排其他人) channel.basicQos(1); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] 接收 '" + message + "'"); try { doWork(message); } catch (InterruptedException e) { e.getMessage(); } finally { System.out.println(" [x] 结束"); // 执行完成,返回确认消息。 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; // 消息确认:目的就是为了不丢失消息,使得确保每个消息有执行成功。(网络抖动也不怕,会找别人执行) boolean autoAck = false; channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { }); } private static void doWork(String task) throws InterruptedException { for (char ch : task.toCharArray()) { if (ch == '.') Thread.sleep(1000); } } } </syntaxhighlight> RabbitMQ 查看当前队列有那些消息还未确定: <syntaxhighlight lang="bash" highlight=""> rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged Timeout: 60.0 seconds ... Listing queues for vhost / ... name messages_ready messages_unacknowledged work 0 3 hello 0 0 </syntaxhighlight> (messages_ready:等待接收的消息;messages_unacknowledged:已接受但未确认的消息) == 发布订阅 == fanout 交换非常简单,它只是'''将接收到的所有消息广播到它知道的所有队列'''。 不同于以上使用具有特定名称的队列(在生产者和消费者之间共享队列时,为队列指定名称非常重要),发布订阅模式中,我们希望听到所有日志消息,而不仅仅是其中的一个子集。我们也只对当前的流消息感兴趣,而不是对旧消息感兴趣。要解决这个问题,我们就需要'''临时队列''': # 一个新的、空的,随机名称的队列; # 一旦断开连接,队列就被自动删除。 在 Java客户机中,当我们不向“queueDeclare()”提供任何参数时,将创建一个非持久的、独占的、自动删除的队列,并使用生成的名称: <syntaxhighlight lang="Java" highlight=""> String queueName = channel.queueDeclare().getQueue(); // queueName包含一个随机队列名称,如:amq.gen-JzTY20BRgKO-hjmuj0wlg </syntaxhighlight> # 生产者: #: <syntaxhighlight lang="Java" highlight=""> package top.codexu.rabbitmq.publicsubscribe; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class EmitLog { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); String message = "info: Hello World!"; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8")); System.out.println(" [x] 发送 '" + message + "'"); } } } </syntaxhighlight> # 消费者: #: <syntaxhighlight lang="Java" highlight=""> package top.codexu.rabbitmq.publicsubscribe; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; public class ReceiveLogs { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] 等待消息."); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] 接收 '" + message + "'"); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } } </syntaxhighlight> RabbitMQ 查看绑定关系: <syntaxhighlight lang="bash" highlight=""> rabbitmqctl list_bindings </syntaxhighlight> == 路由模式 == # 生产者: #: <syntaxhighlight lang="Java" highlight=""> package top.codexu.rabbitmq.routing; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class EmitLogDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); String severity = "warning"; String message = "Hello World!"; channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + severity + "':'" + message + "'"); } } } </syntaxhighlight> # 消费者: #: <syntaxhighlight lang="Java" highlight=""> package top.codexu.rabbitmq.routing; import com.rabbitmq.client.*; public class ReceiveLogsDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); String queueName1 = channel.queueDeclare().getQueue(); String queueName2 = channel.queueDeclare().getQueue(); String[] severities; severities = {"info", "warning", "error"}; for (String severity : severities1) { channel.queueBind(queueName1, EXCHANGE_NAME, severity); } severities = {"info"}; for (String severity : severities2) { channel.queueBind(queueName2, EXCHANGE_NAME, severity); } System.out.println(" [*] 等待消息."); DeliverCallback deliverCallback1 = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] 队列1接收到 '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; channel.basicConsume(queueName1, true, deliverCallback1, consumerTag -> { }); DeliverCallback deliverCallback2 = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] 队列2接收到 '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; channel.basicConsume(queueName2, true, deliverCallback2, consumerTag -> { }); } } </syntaxhighlight> #* 如上,一个 queue 可以绑定多个 routingKey。 == 主题模式 == # 生产者: #: <syntaxhighlight lang="Java" highlight=""> import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class EmitLogTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); String[] routingKeys = new String[]{"quick.orange.rabbit", "lazy.orange.elephant", "quick.orange.fox", "lazy.brown.fox", "quick.brown.fox", "quick.orange.male.rabbit", "lazy.orange.male.rabbit"}; for(String severity : routingKeys){ String message = "From " + severity + " routingKey' s message!"; channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'"); } } } //.. } </syntaxhighlight> # 消费者: #: <syntaxhighlight lang="Java" highlight=""> import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; public class ReceiveLogsTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); String queueName1 = channel.queueDeclare().getQueue(); String queueName2 = channel.queueDeclare().getQueue(); String[] routingKeys; routingKeys = new String[]{"*.orange.*"}; for (String bindingKey : routingKeys) { channel.queueBind(queueName1, EXCHANGE_NAME, bindingKey); } routingKeys = new String[]{"*.*.rabbit", "lazy.#"}; for (String bindingKey : routingKeys) { channel.queueBind(queueName2, EXCHANGE_NAME, bindingKey); } System.out.println(" [*] 等待消息."); DeliverCallback deliverCallback1 = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; channel.basicConsume(queueName1, true, deliverCallback1, consumerTag -> { }); DeliverCallback deliverCallback2 = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; channel.basicConsume(queueName2, true, deliverCallback2, consumerTag -> { }); } } </syntaxhighlight> == 标题文字 == # 生产者: #: <syntaxhighlight lang="Java" highlight=""> </syntaxhighlight> # 消费者: #: <syntaxhighlight lang="Java" highlight=""> </syntaxhighlight> # 生产者: #: <syntaxhighlight lang="Java" highlight=""> </syntaxhighlight> # 消费者: #: <syntaxhighlight lang="Java" highlight=""> </syntaxhighlight> # 生产者: #: <syntaxhighlight lang="Java" highlight=""> </syntaxhighlight> # 消费者: #: <syntaxhighlight lang="Java" highlight=""> </syntaxhighlight>
返回至“
RabbitMQ:Java(工作模式)
”。
导航菜单
个人工具
登录
命名空间
页面
讨论
大陆简体
已展开
已折叠
查看
阅读
查看源代码
查看历史
更多
已展开
已折叠
搜索
导航
首页
最近更改
随机页面
MediaWiki帮助
笔记
服务器
数据库
后端
前端
工具
《To do list》
日常
阅读
电影
摄影
其他
Software
Windows
WIKIOE
所有分类
所有页面
侧边栏
站点日志
工具
链入页面
相关更改
特殊页面
页面信息