“RabbitMQ:Java(关键代码)”的版本间差异

来自Wikioe
跳到导航 跳到搜索
(创建页面,内容为“category:RabbitMQ category:Java == 关于 == 参考:'''[https://www.rabbitmq.com/getstarted.html RabbitMQ Tutorials]''' Maven 依赖: <syntaxhighlight lang="xml" highlight=""> <dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>3.6.0</version> </dependency> ... ... </dependencies> </syntaxhighlight> == 关键代码 == * '''co…”)
 
无编辑摘要
 
第1行: 第1行:
[[category:RabbitMQ]]
[[category:RabbitMQ]]
[[category:Java]]
[[category:Java]]
__TOC__


== 关于 ==
== 关于 ==
参考:'''[https://www.rabbitmq.com/getstarted.html RabbitMQ Tutorials]'''
参考:'''[https://www.rabbitmq.com/getstarted.html RabbitMQ Tutorials]'''
 


Maven 依赖:
Maven 依赖:
<syntaxhighlight lang="xml" highlight="">
: <syntaxhighlight lang="xml" highlight="">
     <dependencies>
     <dependencies>
         <dependency>
         <dependency>
第19行: 第20行:


== 关键代码 ==
== 关键代码 ==
* '''connection''':
=== connection ===
*: <syntaxhighlight lang="Java" highlight="">
: <syntaxhighlight lang="Java" highlight="">
     ConnectionFactory factory = new ConnectionFactory();
     ConnectionFactory factory = new ConnectionFactory();
     factory.setHost("localhost");
     factory.setHost("localhost");
第31行: 第32行:
     connection = factory.newConnection();
     connection = factory.newConnection();
</syntaxhighlight>
</syntaxhighlight>
* '''channel''':
 
*: <syntaxhighlight lang="Java" highlight="">
=== channel ===
: <syntaxhighlight lang="Java" highlight="">
     //创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务
     //创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务
     channel = connection.createChannel();
     channel = connection.createChannel();
</syntaxhighlight>
</syntaxhighlight>
** channel 是必须的:生产者、消费者都通过 channel 与 exchange 交互的。
* channel 是必须的:生产者、消费者都通过 channel 与 exchange 交互的。
* '''exchange'''、'''queue'''、'''binding''':
 
*: <syntaxhighlight lang="Java" highlight="">
=== exchange'''、'''queue'''、'''binding ===
: <syntaxhighlight lang="Java" highlight="">
     /** 声明交换机,
     /** 声明交换机,
     * 参数明细:
     * 参数明细:
第73行: 第76行:
     channel.queueBind(queueName, EXCHANGE_ROUTING_INFORM, "");
     channel.queueBind(queueName, EXCHANGE_ROUTING_INFORM, "");
</syntaxhighlight>
</syntaxhighlight>
* '''消息发送''':
 
*: <syntaxhighlight lang="Java" highlight="">
=== 消息发送 ===
: <syntaxhighlight lang="Java" highlight="">
     for (int i=0;i<10;i++){
     for (int i=0;i<10;i++){
         String message = "sms inform to user"+i;
         String message = "sms inform to user"+i;
第90行: 第94行:
     }
     }
</syntaxhighlight>
</syntaxhighlight>
** “channel.basicPublish()”:简单队列、Work模式,不需要指定 exchange;
* “channel.basicPublish()”:简单队列、Work模式,不需要指定 exchange;
* '''消息接收''':
 
*: <syntaxhighlight lang="Java" highlight="">
=== 消息接收 ===
: <syntaxhighlight lang="Java" highlight="">
     /**
     /**
     * 定义消费方法
     * 定义消费方法
第109行: 第114行:
        *    System.out.println(" [x] 结束");
        *    System.out.println(" [x] 结束");
        *    // 手动确认消息
        *    // 手动确认消息
      *    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        *    // DeliveryTag:唯一地标识通道上的传递
        *    // false:是否批量确认
        *    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        * }
        * }
         */
         */
第124行: 第131行:
     channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
     channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
</syntaxhighlight>
</syntaxhighlight>
*: <syntaxhighlight lang="Java" highlight="">
: <syntaxhighlight lang="Java" highlight="">
     /**
     /**
     * 定义消费方法
     * 定义消费方法
第148行: 第155行:
     channel.basicConsume(QUEUE_INFORM_EMAIL, true, defaultConsumer);
     channel.basicConsume(QUEUE_INFORM_EMAIL, true, defaultConsumer);
</syntaxhighlight>
</syntaxhighlight>
** 使用额外的 '''DeliverCallback''' 接口来缓冲服务器推送给我们的消息。
* 使用额外的 '''DeliverCallback''' 接口来缓冲服务器推送给我们的消息。
** 可以手动或自动确认消息;(若不确认消息,MQ 就会认为这个消息没执行)
* 可以手动或自动确认消息;(若不确认消息,MQ 就会认为这个消息没执行)

2022年10月31日 (一) 21:56的最新版本


关于

参考:RabbitMQ Tutorials

Maven 依赖:

    <dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.6.0</version>
        </dependency>
        ... ...
    </dependencies>

关键代码

connection

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    factory.setPort(5672);
    factory.setUsername("guest");
    factory.setPassword("guest");
    factory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器
    
    //创建与RabbitMQ服务的TCP连接
    connection = factory.newConnection();

channel

    //创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务
    channel = connection.createChannel();
  • channel 是必须的:生产者、消费者都通过 channel 与 exchange 交互的。

exchangequeuebinding

    /** 声明交换机,
    * 参数明细:
    * 1、String exchange:交换机名称
    * 2、BuiltinExchangeType type:交换机类型
    */
    channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
    
    /** 声明队列
    * 参数明细:
    * 1、String queue:队列名称
    * 2、boolean durable:是否持久化
    * 3、boolean exclusive:是否独占此队列
    * 4、boolean autoDelete:队列不用是否自动删除
    * 5、Map<String, Object> arguments:参数
    */
    channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
    channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);
    
    /** 绑定
    * 参数明细
    * 1、String queue:队列名称
    * 2、String exchange:交换机名称
    * 3、String routingKey:路由key
    */
    channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_EMAIL);
    channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_SMS);
    
    
    /** 
    * 临时队列
    */
    String queueName = channel.queueDeclare().getQueue();
    channel.queueBind(queueName, EXCHANGE_ROUTING_INFORM, "");

消息发送

    for (int i=0;i<10;i++){
        String message = "sms inform to user"+i;
        
        /** 向交换机发送消息
        * 参数明细
        * 1、String exchange:交换机名称
        * 2、String routingKey:路由key
        * 3、BasicProperties props:消息属性
        * 4、byte[] body:消息内容
        */
        channel.basicPublish(EXCHANGE_ROUTING_INFORM, QUEUE_INFORM_SMS, null, message.getBytes());
        
        System.out.println("Send Message is:'" + message + "'");
    }
  • “channel.basicPublish()”:简单队列、Work模式,不需要指定 exchange;

消息接收

    /**
    * 定义消费方法
    */
    DeliverCallback deliverCallback = (consumerTag, delivery) -> {
        String message = new String(delivery.getBody(), "UTF-8");
        System.out.println(" [x] 接收 '" + message + "'");
        
        /**
        * try {
        *     // 调用消息处理
        *     doWork(message);
        * } catch (InterruptedException e) {
        *     e.getMessage();
        * } finally {
        *     System.out.println(" [x] 结束");
        *     // 手动确认消息
        *     // DeliveryTag:唯一地标识通道上的传递
        *     // false:是否批量确认
        *     channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        * }
        */
    };
    
    /**
    * 监听队列
    * 参数明细
    * 1、String queue:队列名称
    * 2、boolean autoAck:是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动回复
    * 3、Consumer callback:消费消息的方法,消费者接收到消息后调用此方法
    * 4、
    */
    channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    /**
    * 定义消费方法
    */
    DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            long deliveryTag = envelope.getDeliveryTag();
            String exchange = envelope.getExchange();
            // 消息内容
            String message = new String(body, "utf-8");
            System.out.println(message);
        }
    };
    
    /**
    * 监听队列
    * 参数明细
    * 1、String queue:队列名称
    * 2、boolean autoAck:是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动回复
    * 3、Consumer callback:消费消息的方法,消费者接收到消息后调用此方法
    */
    channel.basicConsume(QUEUE_INFORM_EMAIL, true, defaultConsumer);
  • 使用额外的 DeliverCallback 接口来缓冲服务器推送给我们的消息。
  • 可以手动或自动确认消息;(若不确认消息,MQ 就会认为这个消息没执行)