“RabbitMQ:Java(工作模式)”的版本间差异
		
		
		
		
		
		跳到导航
		跳到搜索
		
				
		
		
	
 (建立内容为“category:RabbitMQ category:Java  == 关于 ==    <syntaxhighlight lang="Java" highlight="">  </syntaxhighlight>   <syntaxhighlight lang="Java" highlight="…”的新页面)  | 
				无编辑摘要  | 
				||
| 第3行: | 第3行: | ||
== 关于 ==  | == 关于 ==  | ||
代码示例来自官网:[https://www.rabbitmq.com/getstarted.html https://www.rabbitmq.com/getstarted.html]  | |||
Maven 依赖:  | |||
<syntaxhighlight lang="xml" highlight="">  | |||
    <dependencies>  | |||
        <dependency>  | |||
            <groupId>com.rabbitmq</groupId>  | |||
            <artifactId>amqp-client</artifactId>  | |||
            <version>3.6.0</version>  | |||
        </dependency>  | |||
        ... ...  | |||
    </dependencies>  | |||
</syntaxhighlight>  | |||
: [[File:RabbitMQ:Java示例.png|400px]]  | |||
== 关键代码 ==  | |||
* '''connection''':  | |||
*: <syntaxhighlight lang="Java" highlight="">  | |||
    ConnectionFactory factory = new ConnectionFactory();  | |||
    factory.setHost("localhost");  | |||
    factory.setPort(5672);  | |||
    factory.setUsername("guest");  | |||
    factory.setPassword("guest");  | |||
    factory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器  | |||
    //创建与RabbitMQ服务的TCP连接  | |||
    connection = factory.newConnection();  | |||
</syntaxhighlight>  | |||
* '''channel''':  | |||
*: <syntaxhighlight lang="Java" highlight="">  | |||
    //创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务  | |||
    channel = connection.createChannel();  | |||
</syntaxhighlight>  | |||
** channel 是必须的:生产者、消费者都通过 channel 与 exchange 交互的。  | |||
* '''exchange'''、'''queue''':  | |||
*: <syntaxhighlight lang="Java" highlight="">  | |||
    /** 声明交换机,  | |||
    * 参数明细:  | |||
    * 1、String exchange:交换机名称  | |||
    * 2、BuiltinExchangeType type:交换机类型  | |||
    */  | |||
    channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);  | |||
    /** 声明队列  | |||
    * 参数明细:  | |||
    * 1、String queue:队列名称  | |||
    * 2、boolean durable:是否持久化  | |||
    * 3、boolean exclusive:是否独占此队列  | |||
    * 4、boolean autoDelete:队列不用是否自动删除  | |||
    * 5、Map<String, Object> arguments:参数  | |||
    */  | |||
    channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);  | |||
    channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);  | |||
    /** 绑定  | |||
    * 参数明细  | |||
    * 1、String queue:队列名称  | |||
    * 2、String exchange:交换机名称  | |||
    * 3、String routingKey:路由key  | |||
    */  | |||
    channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_EMAIL);  | |||
    channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_SMS);  | |||
</syntaxhighlight>  | |||
** 根据不同的工作模式,不一定需要声明交换机、队列,或是绑定:  | |||
**# 简单队列:  | |||
**# Work模式:  | |||
**# 发布订阅:  | |||
**# 路由模式:  | |||
**# 主题模式:  | |||
* 消息发送:  | |||
*: <syntaxhighlight lang="Java" highlight="">  | |||
    for (int i=0;i<10;i++){  | |||
        String message = "sms inform to user"+i;  | |||
        /** 向交换机发送消息  | |||
        * 参数明细  | |||
        * 1、String exchange:交换机名称  | |||
        * 2、String routingKey:路由key  | |||
        * 3、BasicProperties props:消息属性  | |||
        * 4、byte[] body:消息内容  | |||
        */  | |||
        channel.basicPublish(EXCHANGE_ROUTING_INFORM, QUEUE_INFORM_SMS, null, message.getBytes());  | |||
        System.out.println("Send Message is:'" + message + "'");  | |||
    }  | |||
</syntaxhighlight>  | |||
** “channel.basicPublish()”:简单队列、Work模式,不需要指定 exchange;  | |||
* 消息接收:  | |||
*: <syntaxhighlight lang="Java" highlight="">  | |||
    /**  | |||
    * 定义消费方法,写法一:  | |||
    *   | |||
    * DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {  | |||
    *     @Override  | |||
    *     public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  | |||
    *         long deliveryTag = envelope.getDeliveryTag();  | |||
    *         String exchange = envelope.getExchange();  | |||
    *         // 消息内容  | |||
    *         String message = new String(body, "utf-8");  | |||
    *         System.out.println(message);  | |||
    *     }  | |||
    * };  | |||
    */  | |||
    /**  | |||
    * 定义消费方法,写法二:(Lambda表达式,手动确认消息)  | |||
    *   | |||
    * DeliverCallback deliverCallback = (consumerTag, delivery) -> {  | |||
    *     String message = new String(delivery.getBody(), "UTF-8");  | |||
    *       | |||
    *     try {  | |||
    *         // 调用消息处理  | |||
    *         doWork(message);  | |||
    *     } catch (InterruptedException e) {  | |||
    *         e.getMessage();  | |||
    *     } finally {  | |||
    *         System.out.println(" [x] 接收 '" + message + "'");  | |||
    *         // 手动确认消息  | |||
    *         channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);  | |||
    *     }  | |||
    * };  | |||
    // 定义消费方法,写法三:  | |||
    DeliverCallback deliverCallback = (consumerTag, delivery) -> {  | |||
        String message = new String(delivery.getBody(), "UTF-8");  | |||
        System.out.println(" [x] 接收 '" + message + "'");  | |||
    };  | |||
    /**  | |||
    * 监听队列, ,  | |||
    * 参数明细  | |||
    * 1、String queue:队列名称  | |||
    * 2、boolean autoAck:是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动回复  | |||
    * 3、Consumer callback:消费消息的方法,消费者接收到消息后调用此方法  | |||
    */  | |||
    channel.basicConsume(QUEUE_INFORM_EMAIL, true, deliverCallback);  | |||
</syntaxhighlight>  | |||
** 可以手动或自动确认消息;(若不确认消息,MQ 就会认为这个消息没执行)  | |||
== Hello World ==  | |||
<syntaxhighlight lang="Java" highlight="">  | <syntaxhighlight lang="Java" highlight="">  | ||
2021年5月24日 (一) 21:28的版本
关于
代码示例来自官网:https://www.rabbitmq.com/getstarted.html
Maven 依赖:
    <dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.6.0</version>
        </dependency>
        ... ...
    </dependencies>
关键代码
- connection:
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setUsername("guest"); factory.setPassword("guest"); factory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器 //创建与RabbitMQ服务的TCP连接 connection = factory.newConnection();
 - channel:
//创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务 channel = connection.createChannel();
- channel 是必须的:生产者、消费者都通过 channel 与 exchange 交互的。
 
 - exchange、queue:
/** 声明交换机, * 参数明细: * 1、String exchange:交换机名称 * 2、BuiltinExchangeType type:交换机类型 */ channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT); /** 声明队列 * 参数明细: * 1、String queue:队列名称 * 2、boolean durable:是否持久化 * 3、boolean exclusive:是否独占此队列 * 4、boolean autoDelete:队列不用是否自动删除 * 5、Map<String, Object> arguments:参数 */ channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null); channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null); /** 绑定 * 参数明细 * 1、String queue:队列名称 * 2、String exchange:交换机名称 * 3、String routingKey:路由key */ channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_EMAIL); channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_SMS);
- 根据不同的工作模式,不一定需要声明交换机、队列,或是绑定:
- 简单队列:
 - Work模式:
 - 发布订阅:
 - 路由模式:
 - 主题模式:
 
 
 - 消息发送:
for (int i=0;i<10;i++){ String message = "sms inform to user"+i; /** 向交换机发送消息 * 参数明细 * 1、String exchange:交换机名称 * 2、String routingKey:路由key * 3、BasicProperties props:消息属性 * 4、byte[] body:消息内容 */ channel.basicPublish(EXCHANGE_ROUTING_INFORM, QUEUE_INFORM_SMS, null, message.getBytes()); System.out.println("Send Message is:'" + message + "'"); }
- “channel.basicPublish()”:简单队列、Work模式,不需要指定 exchange;
 
 - 消息接收:
/** * 定义消费方法,写法一: * * DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { * @Override * public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { * long deliveryTag = envelope.getDeliveryTag(); * String exchange = envelope.getExchange(); * // 消息内容 * String message = new String(body, "utf-8"); * System.out.println(message); * } * }; */ /** * 定义消费方法,写法二:(Lambda表达式,手动确认消息) * * DeliverCallback deliverCallback = (consumerTag, delivery) -> { * String message = new String(delivery.getBody(), "UTF-8"); * * try { * // 调用消息处理 * doWork(message); * } catch (InterruptedException e) { * e.getMessage(); * } finally { * System.out.println(" [x] 接收 '" + message + "'"); * // 手动确认消息 * channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); * } * }; // 定义消费方法,写法三: DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] 接收 '" + message + "'"); }; /** * 监听队列, , * 参数明细 * 1、String queue:队列名称 * 2、boolean autoAck:是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动回复 * 3、Consumer callback:消费消息的方法,消费者接收到消息后调用此方法 */ channel.basicConsume(QUEUE_INFORM_EMAIL, true, deliverCallback);
- 可以手动或自动确认消息;(若不确认消息,MQ 就会认为这个消息没执行)
 
 
Hello World