“RabbitMQ:Java(工作模式)”的版本间差异
跳到导航
跳到搜索
(建立内容为“category:RabbitMQ category:Java == 关于 == <syntaxhighlight lang="Java" highlight=""> </syntaxhighlight> <syntaxhighlight lang="Java" highlight="…”的新页面) |
无编辑摘要 |
||
第3行: | 第3行: | ||
== 关于 == | == 关于 == | ||
代码示例来自官网:[https://www.rabbitmq.com/getstarted.html https://www.rabbitmq.com/getstarted.html] | |||
Maven 依赖: | |||
<syntaxhighlight lang="xml" highlight=""> | |||
<dependencies> | |||
<dependency> | |||
<groupId>com.rabbitmq</groupId> | |||
<artifactId>amqp-client</artifactId> | |||
<version>3.6.0</version> | |||
</dependency> | |||
... ... | |||
</dependencies> | |||
</syntaxhighlight> | |||
: [[File:RabbitMQ:Java示例.png|400px]] | |||
== 关键代码 == | |||
* '''connection''': | |||
*: <syntaxhighlight lang="Java" highlight=""> | |||
ConnectionFactory factory = new ConnectionFactory(); | |||
factory.setHost("localhost"); | |||
factory.setPort(5672); | |||
factory.setUsername("guest"); | |||
factory.setPassword("guest"); | |||
factory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器 | |||
//创建与RabbitMQ服务的TCP连接 | |||
connection = factory.newConnection(); | |||
</syntaxhighlight> | |||
* '''channel''': | |||
*: <syntaxhighlight lang="Java" highlight=""> | |||
//创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务 | |||
channel = connection.createChannel(); | |||
</syntaxhighlight> | |||
** channel 是必须的:生产者、消费者都通过 channel 与 exchange 交互的。 | |||
* '''exchange'''、'''queue''': | |||
*: <syntaxhighlight lang="Java" highlight=""> | |||
/** 声明交换机, | |||
* 参数明细: | |||
* 1、String exchange:交换机名称 | |||
* 2、BuiltinExchangeType type:交换机类型 | |||
*/ | |||
channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT); | |||
/** 声明队列 | |||
* 参数明细: | |||
* 1、String queue:队列名称 | |||
* 2、boolean durable:是否持久化 | |||
* 3、boolean exclusive:是否独占此队列 | |||
* 4、boolean autoDelete:队列不用是否自动删除 | |||
* 5、Map<String, Object> arguments:参数 | |||
*/ | |||
channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null); | |||
channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null); | |||
/** 绑定 | |||
* 参数明细 | |||
* 1、String queue:队列名称 | |||
* 2、String exchange:交换机名称 | |||
* 3、String routingKey:路由key | |||
*/ | |||
channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_EMAIL); | |||
channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_SMS); | |||
</syntaxhighlight> | |||
** 根据不同的工作模式,不一定需要声明交换机、队列,或是绑定: | |||
**# 简单队列: | |||
**# Work模式: | |||
**# 发布订阅: | |||
**# 路由模式: | |||
**# 主题模式: | |||
* 消息发送: | |||
*: <syntaxhighlight lang="Java" highlight=""> | |||
for (int i=0;i<10;i++){ | |||
String message = "sms inform to user"+i; | |||
/** 向交换机发送消息 | |||
* 参数明细 | |||
* 1、String exchange:交换机名称 | |||
* 2、String routingKey:路由key | |||
* 3、BasicProperties props:消息属性 | |||
* 4、byte[] body:消息内容 | |||
*/ | |||
channel.basicPublish(EXCHANGE_ROUTING_INFORM, QUEUE_INFORM_SMS, null, message.getBytes()); | |||
System.out.println("Send Message is:'" + message + "'"); | |||
} | |||
</syntaxhighlight> | |||
** “channel.basicPublish()”:简单队列、Work模式,不需要指定 exchange; | |||
* 消息接收: | |||
*: <syntaxhighlight lang="Java" highlight=""> | |||
/** | |||
* 定义消费方法,写法一: | |||
* | |||
* DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { | |||
* @Override | |||
* public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { | |||
* long deliveryTag = envelope.getDeliveryTag(); | |||
* String exchange = envelope.getExchange(); | |||
* // 消息内容 | |||
* String message = new String(body, "utf-8"); | |||
* System.out.println(message); | |||
* } | |||
* }; | |||
*/ | |||
/** | |||
* 定义消费方法,写法二:(Lambda表达式,手动确认消息) | |||
* | |||
* DeliverCallback deliverCallback = (consumerTag, delivery) -> { | |||
* String message = new String(delivery.getBody(), "UTF-8"); | |||
* | |||
* try { | |||
* // 调用消息处理 | |||
* doWork(message); | |||
* } catch (InterruptedException e) { | |||
* e.getMessage(); | |||
* } finally { | |||
* System.out.println(" [x] 接收 '" + message + "'"); | |||
* // 手动确认消息 | |||
* channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); | |||
* } | |||
* }; | |||
// 定义消费方法,写法三: | |||
DeliverCallback deliverCallback = (consumerTag, delivery) -> { | |||
String message = new String(delivery.getBody(), "UTF-8"); | |||
System.out.println(" [x] 接收 '" + message + "'"); | |||
}; | |||
/** | |||
* 监听队列, , | |||
* 参数明细 | |||
* 1、String queue:队列名称 | |||
* 2、boolean autoAck:是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动回复 | |||
* 3、Consumer callback:消费消息的方法,消费者接收到消息后调用此方法 | |||
*/ | |||
channel.basicConsume(QUEUE_INFORM_EMAIL, true, deliverCallback); | |||
</syntaxhighlight> | |||
** 可以手动或自动确认消息;(若不确认消息,MQ 就会认为这个消息没执行) | |||
== Hello World == | |||
<syntaxhighlight lang="Java" highlight=""> | <syntaxhighlight lang="Java" highlight=""> |
2021年5月24日 (一) 21:28的版本
关于
代码示例来自官网:https://www.rabbitmq.com/getstarted.html
Maven 依赖:
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.0</version>
</dependency>
... ...
</dependencies>
关键代码
- connection:
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setUsername("guest"); factory.setPassword("guest"); factory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器 //创建与RabbitMQ服务的TCP连接 connection = factory.newConnection();
- channel:
//创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务 channel = connection.createChannel();
- channel 是必须的:生产者、消费者都通过 channel 与 exchange 交互的。
- exchange、queue:
/** 声明交换机, * 参数明细: * 1、String exchange:交换机名称 * 2、BuiltinExchangeType type:交换机类型 */ channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT); /** 声明队列 * 参数明细: * 1、String queue:队列名称 * 2、boolean durable:是否持久化 * 3、boolean exclusive:是否独占此队列 * 4、boolean autoDelete:队列不用是否自动删除 * 5、Map<String, Object> arguments:参数 */ channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null); channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null); /** 绑定 * 参数明细 * 1、String queue:队列名称 * 2、String exchange:交换机名称 * 3、String routingKey:路由key */ channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_EMAIL); channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_SMS);
- 根据不同的工作模式,不一定需要声明交换机、队列,或是绑定:
- 简单队列:
- Work模式:
- 发布订阅:
- 路由模式:
- 主题模式:
- 消息发送:
for (int i=0;i<10;i++){ String message = "sms inform to user"+i; /** 向交换机发送消息 * 参数明细 * 1、String exchange:交换机名称 * 2、String routingKey:路由key * 3、BasicProperties props:消息属性 * 4、byte[] body:消息内容 */ channel.basicPublish(EXCHANGE_ROUTING_INFORM, QUEUE_INFORM_SMS, null, message.getBytes()); System.out.println("Send Message is:'" + message + "'"); }
- “channel.basicPublish()”:简单队列、Work模式,不需要指定 exchange;
- 消息接收:
/** * 定义消费方法,写法一: * * DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { * @Override * public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { * long deliveryTag = envelope.getDeliveryTag(); * String exchange = envelope.getExchange(); * // 消息内容 * String message = new String(body, "utf-8"); * System.out.println(message); * } * }; */ /** * 定义消费方法,写法二:(Lambda表达式,手动确认消息) * * DeliverCallback deliverCallback = (consumerTag, delivery) -> { * String message = new String(delivery.getBody(), "UTF-8"); * * try { * // 调用消息处理 * doWork(message); * } catch (InterruptedException e) { * e.getMessage(); * } finally { * System.out.println(" [x] 接收 '" + message + "'"); * // 手动确认消息 * channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); * } * }; // 定义消费方法,写法三: DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] 接收 '" + message + "'"); }; /** * 监听队列, , * 参数明细 * 1、String queue:队列名称 * 2、boolean autoAck:是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动回复 * 3、Consumer callback:消费消息的方法,消费者接收到消息后调用此方法 */ channel.basicConsume(QUEUE_INFORM_EMAIL, true, deliverCallback);
- 可以手动或自动确认消息;(若不确认消息,MQ 就会认为这个消息没执行)
Hello World