“RabbitMQ:Java(工作模式)”的版本间差异

来自Wikioe
跳到导航 跳到搜索
第74行: 第74行:
**# 路由模式:
**# 路由模式:
**# 主题模式:
**# 主题模式:
* 消息发送:
* '''消息发送''':
*: <syntaxhighlight lang="Java" highlight="">
*: <syntaxhighlight lang="Java" highlight="">
     for (int i=0;i<10;i++){
     for (int i=0;i<10;i++){
第92行: 第92行:
</syntaxhighlight>
</syntaxhighlight>
** “channel.basicPublish()”:简单队列、Work模式,不需要指定 exchange;
** “channel.basicPublish()”:简单队列、Work模式,不需要指定 exchange;
* 消息接收:
* '''消息接收''':
*: <syntaxhighlight lang="Java" highlight="">
*: <syntaxhighlight lang="Java" highlight="">
     /**
     /**

2021年5月24日 (一) 21:29的版本


关于

代码示例来自官网:https://www.rabbitmq.com/getstarted.html


Maven 依赖:

    <dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.6.0</version>
        </dependency>
        ... ...
    </dependencies>
文件:RabbitMQ:Java示例.png

关键代码

  • connection
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器
        
        //创建与RabbitMQ服务的TCP连接
        connection = factory.newConnection();
    
  • channel
        //创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务
        channel = connection.createChannel();
    
    • channel 是必须的:生产者、消费者都通过 channel 与 exchange 交互的。
  • exchangequeue
        /** 声明交换机,
        * 参数明细:
        * 1、String exchange:交换机名称
        * 2、BuiltinExchangeType type:交换机类型
        */
        channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
        
        /** 声明队列
        * 参数明细:
        * 1、String queue:队列名称
        * 2、boolean durable:是否持久化
        * 3、boolean exclusive:是否独占此队列
        * 4、boolean autoDelete:队列不用是否自动删除
        * 5、Map<String, Object> arguments:参数
        */
        channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
        channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);
        
        /** 绑定
        * 参数明细
        * 1、String queue:队列名称
        * 2、String exchange:交换机名称
        * 3、String routingKey:路由key
        */
        channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_EMAIL);
        channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_SMS);
    
    • 根据不同的工作模式,不一定需要声明交换机、队列,或是绑定:
      1. 简单队列:
      2. Work模式:
      3. 发布订阅:
      4. 路由模式:
      5. 主题模式:
  • 消息发送
        for (int i=0;i<10;i++){
            String message = "sms inform to user"+i;
            
            /** 向交换机发送消息
            * 参数明细
            * 1、String exchange:交换机名称
            * 2、String routingKey:路由key
            * 3、BasicProperties props:消息属性
            * 4、byte[] body:消息内容
            */
            channel.basicPublish(EXCHANGE_ROUTING_INFORM, QUEUE_INFORM_SMS, null, message.getBytes());
            
            System.out.println("Send Message is:'" + message + "'");
        }
    
    • “channel.basicPublish()”:简单队列、Work模式,不需要指定 exchange;
  • 消息接收
        /**
        * 定义消费方法,写法一:
        * 
        * DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
        *     @Override
        *     public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        *         long deliveryTag = envelope.getDeliveryTag();
        *         String exchange = envelope.getExchange();
        *         // 消息内容
        *         String message = new String(body, "utf-8");
        *         System.out.println(message);
        *     }
        * };
        */
        
        /**
        * 定义消费方法,写法二:(Lambda表达式,手动确认消息)
        * 
        * DeliverCallback deliverCallback = (consumerTag, delivery) -> {
        *     String message = new String(delivery.getBody(), "UTF-8");
        *     
        *     try {
        *         // 调用消息处理
        *         doWork(message);
        *     } catch (InterruptedException e) {
        *         e.getMessage();
        *     } finally {
        *         System.out.println(" [x] 接收 '" + message + "'");
        *         // 手动确认消息
        *         channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        *     }
        * };
        
        // 定义消费方法,写法三:
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] 接收 '" + message + "'");
        };
        
        /**
        * 监听队列, ,
        * 参数明细
        * 1、String queue:队列名称
        * 2、boolean autoAck:是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动回复
        * 3、Consumer callback:消费消息的方法,消费者接收到消息后调用此方法
        */
        channel.basicConsume(QUEUE_INFORM_EMAIL, true, deliverCallback);
    
    • 可以手动或自动确认消息;(若不确认消息,MQ 就会认为这个消息没执行)

Hello World