RabbitMQ:Java(工作模式)

来自Wikioe
Eijux讨论 | 贡献2021年5月25日 (二) 00:25的版本 →‎主题模式
跳到导航 跳到搜索


关于

代码示例来自官网:https://www.rabbitmq.com/getstarted.html


Maven 依赖:

    <dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.6.0</version>
        </dependency>
        ... ...
    </dependencies>
文件:RabbitMQ:Java示例.png

关键代码

  • connection
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器
        
        //创建与RabbitMQ服务的TCP连接
        connection = factory.newConnection();
    
  • channel
        //创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务
        channel = connection.createChannel();
    
    • channel 是必须的:生产者、消费者都通过 channel 与 exchange 交互的。
  • exchangequeue
        /** 声明交换机,
        * 参数明细:
        * 1、String exchange:交换机名称
        * 2、BuiltinExchangeType type:交换机类型
        */
        channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
        
        /** 声明队列
        * 参数明细:
        * 1、String queue:队列名称
        * 2、boolean durable:是否持久化
        * 3、boolean exclusive:是否独占此队列
        * 4、boolean autoDelete:队列不用是否自动删除
        * 5、Map<String, Object> arguments:参数
        */
        channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
        channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);
        
        /** 绑定
        * 参数明细
        * 1、String queue:队列名称
        * 2、String exchange:交换机名称
        * 3、String routingKey:路由key
        */
        channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_EMAIL);
        channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_SMS);
    
    • 根据不同的工作模式,不一定需要声明交换机、队列,或是绑定:
      1. 简单队列:
      2. Work模式:
      3. 发布订阅:
      4. 路由模式:
      5. 主题模式:
  • 消息发送
        for (int i=0;i<10;i++){
            String message = "sms inform to user"+i;
            
            /** 向交换机发送消息
            * 参数明细
            * 1、String exchange:交换机名称
            * 2、String routingKey:路由key
            * 3、BasicProperties props:消息属性
            * 4、byte[] body:消息内容
            */
            channel.basicPublish(EXCHANGE_ROUTING_INFORM, QUEUE_INFORM_SMS, null, message.getBytes());
            
            System.out.println("Send Message is:'" + message + "'");
        }
    
    • “channel.basicPublish()”:简单队列、Work模式,不需要指定 exchange;
  • 消息接收
        /**
        * 定义消费方法,写法一:
        * 
        * DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
        *     @Override
        *     public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        *         long deliveryTag = envelope.getDeliveryTag();
        *         String exchange = envelope.getExchange();
        *         // 消息内容
        *         String message = new String(body, "utf-8");
        *         System.out.println(message);
        *     }
        * };
        */
        
        /**
        * 定义消费方法,写法二:(Lambda表达式,手动确认消息)
        * 
        * DeliverCallback deliverCallback = (consumerTag, delivery) -> {
        *     String message = new String(delivery.getBody(), "UTF-8");
        *     
        *     try {
        *         // 调用消息处理
        *         doWork(message);
        *     } catch (InterruptedException e) {
        *         e.getMessage();
        *     } finally {
        *         System.out.println(" [x] 接收 '" + message + "'");
        *         // 手动确认消息
        *         channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        *     }
        * };
        
        // 定义消费方法,写法三:
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] 接收 '" + message + "'");
        };
        
        /**
        * 监听队列, ,
        * 参数明细
        * 1、String queue:队列名称
        * 2、boolean autoAck:是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置为false则需要手动回复
        * 3、Consumer callback:消费消息的方法,消费者接收到消息后调用此方法
        */
        channel.basicConsume(QUEUE_INFORM_EMAIL, true, deliverCallback);
    
    • 可以手动或自动确认消息;(若不确认消息,MQ 就会认为这个消息没执行)

Hello World

  1. 生产者:
    package top.codexu.rabbitmq.hello;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import java.nio.charset.StandardCharsets;
    
    public class Send {
        private final static String QUEUE_NAME = "hello";
    
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            
            try (Connection connection = factory.newConnection();
                Channel channel = connection.createChannel()) {
                 
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                String message = "Hello World!";
                
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
                System.out.println(" [x] 发送 '" + message + "'");
            }
        }
    }
    
    • 可以使用 try-with-resource 语句,不必再显式地关闭资源;(Connection 和 Channel 都实现 java.io.Closeable)
  2. 消费者:
    package top.codexu.rabbitmq.hello;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DeliverCallback;
    
    public class Recv {
        private final static String QUEUE_NAME = "hello";
    
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            
            Channel channel = connection.createChannel();
    
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println(" [*] 等待消息.");
    
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] 接收 '" + message + "'");
            };
    
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
        }
    }
    
    • 在这里也声明了队列:因为我们可能会在发布服务器之前启动使用者,所以我们希望在尝试使用来自队列的消息之前确保该队列存在
    • 不使用 try-with-resource 语句自动关闭通道和连接:因为我们希望在消费者异步侦听消息到达时,该过程保持活动状态。

Work模式

工作队列是啥?

就是当遇到了运行耗时久的任务,并且还得等待它完成,这个时候就可以使用工作队列,把这个耗时任务发送给别的工人(消费者)进行处理,生产者可以直接得到处理完的情况。

相关特性:

  1. 消息确认:消费者发送通知告诉 MQ 处理完成了;
  2. 消息持久:遇到了宕机后,会把消息给缓存或保存下来,使得下次启动能够不丢失(但不是百分百);
  3. 循环调度:若开着两个或三个消费者的时候,当多个消息要接收,MQ 是会自动循环找下一个,避免一直重复同一个或几个;
  4. 公平派遣:当有两个消费者的时候,若一个消费者一直再累死累活,另外一个逍遥自在,这是不利于效率提升的,故可以通过设置,限制若A忙就找B去。


  1. 生产者:
    package top.codexu.rabbitmq.work;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.MessageProperties;
    import java.nio.charset.StandardCharsets;
    
    public class NewTask {
        private final static String TASK_QUEUE_NAME = "task_queue";
    
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
    
            try (Connection connection = factory.newConnection();
                Channel channel = connection.createChannel()) {
                
                // 消息持久化(非百分百保证)。前提:队列第一次声明就是持久的,并且在生产者端和消费者端都要声明为true。
                boolean durable = true;
                channel.queueDeclare(TASK_QUEUE_NAME, durable, false, false, null);
    
                for (int i = 0; i <= 5; i++) {
                    String message = "第" + i + "个消息.";
                    
                    // MessageProperties.PERSISTENT_TEXT_PLAIN:将消息标记为持久性,确保即使 RabbitMQ 重新启动,task_queue 队列也不会丢失。
                    channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));
                    System.out.println(" [x] 发送 '" + message + "'");
                }
            }
        }
    }
    
    • 将消息标记为持久性并不能完全保证消息不会丢失
      尽管它告诉 RabbitMQ 将消息保存到磁盘,但 RabbitMQ 接受消息但尚未保存消息的时间窗口仍然很短。而且,RabbitMQ 并不是对每条消息都执行“fsync(2)”:它可能只是保存到缓存中,而不是真正写入磁盘。
      持久性保证不强,但对于简单任务队列来说已经足够了。如果你需要一个更强有力的保证,那么你可以使用“publisher confirms”。
  2. 消费者:
    package top.codexu.rabbitmq.work;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DeliverCallback;
    
    public class Worker {
    
        private final static String TASK_QUEUE_NAME = "task_queue";
    
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            final Connection connection = factory.newConnection();
            final Channel channel = connection.createChannel();
    		
            // 消息持久化(非百分百保证)。消费者端同发布者一样设置为 true
            boolean durable = true;
            channel.queueDeclare(TASK_QUEUE_NAME, durable, false, false, null);
            System.out.println(" [*] 等待消息.");
    
            // 公平派遣设置:一次仅接受一条未经确认的消息,为了实现公平的安排(其他人闲着就安排其他人)
            channel.basicQos(1);
    
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] 接收 '" + message + "'");
                
                try {
                    doWork(message);
                } catch (InterruptedException e) {
                    e.getMessage();
                } finally {
                    System.out.println(" [x] 结束");
                    
                    // 执行完成,返回确认消息。
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                }
            };
            
            // 消息确认:目的就是为了不丢失消息,使得确保每个消息有执行成功。(网络抖动也不怕,会找别人执行)
            boolean autoAck = false;
            channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
    
        }
    
        private static void doWork(String task) throws InterruptedException {
            for (char ch : task.toCharArray()) {
                if (ch == '.') Thread.sleep(1000);
            }
        }
    }
    


RabbitMQ 查看当前队列有那些消息还未确定:

rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name    messages_ready  messages_unacknowledged
work    0       3
hello   0       0

(messages_ready:等待接收的消息;messages_unacknowledged:已接受但未确认的消息)

发布订阅

fanout 交换非常简单,它只是将接收到的所有消息广播到它知道的所有队列


不同于以上使用具有特定名称的队列(在生产者和消费者之间共享队列时,为队列指定名称非常重要),发布订阅模式中,我们希望听到所有日志消息,而不仅仅是其中的一个子集。我们也只对当前的流消息感兴趣,而不是对旧消息感兴趣。要解决这个问题,我们就需要临时队列

  1. 一个新的、空的,随机名称的队列;
  2. 一旦断开连接,队列就被自动删除。

在 Java客户机中,当我们不向“queueDeclare()”提供任何参数时,将创建一个非持久的、独占的、自动删除的队列,并使用生成的名称:

String queueName = channel.queueDeclare().getQueue();   // queueName包含一个随机队列名称,如:amq.gen-JzTY20BRgKO-hjmuj0wlg


  1. 生产者:
    package top.codexu.rabbitmq.publicsubscribe;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class EmitLog {
        private static final String EXCHANGE_NAME = "logs";
    
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            
            try (Connection connection = factory.newConnection();
                Channel channel = connection.createChannel()) {
                
                channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
    
                String message = "info: Hello World!";
    
                channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
                System.out.println(" [x] 发送 '" + message + "'");
            }
        }
    }
    
  2. 消费者:
    package top.codexu.rabbitmq.publicsubscribe;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DeliverCallback;
    
    public class ReceiveLogs {
        private static final String EXCHANGE_NAME = "logs";
    
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
            String queueName = channel.queueDeclare().getQueue();
            channel.queueBind(queueName, EXCHANGE_NAME, "");
    
            System.out.println(" [*] 等待消息.");
    
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] 接收 '" + message + "'");
            };
            channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
        }
    }
    


RabbitMQ 查看绑定关系:

rabbitmqctl list_bindings

路由模式

  1. 生产者:
    package top.codexu.rabbitmq.routing;
    
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class EmitLogDirect {
    
        private static final String EXCHANGE_NAME = "direct_logs";
    
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            
            try (Connection connection = factory.newConnection();
                 Channel channel = connection.createChannel()) {
    
                channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
                
                String severity = "warning";
                String message = "Hello World!";
                
                channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
                System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
            }
        }
    
    }
    
  2. 消费者:
    package top.codexu.rabbitmq.routing;
    
    import com.rabbitmq.client.*;
    
    public class ReceiveLogsDirect {
    
        private static final String EXCHANGE_NAME = "direct_logs";
    
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
    
            String queueName1 = channel.queueDeclare().getQueue();
            String queueName2 = channel.queueDeclare().getQueue();
    
            String[] severities
            severities = {"info", "warning", "error"};
            for (String severity : severities1) {
                channel.queueBind(queueName1, EXCHANGE_NAME, severity);
            }
            severities = {"info"};
            for (String severity : severities2) {
                channel.queueBind(queueName2, EXCHANGE_NAME, severity);
            }
            System.out.println(" [*] 等待消息.");
    
    
            DeliverCallback deliverCallback1 = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] 队列1接收到 '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
            };
            channel.basicConsume(queueName1, true, deliverCallback1, consumerTag -> { });
    
            DeliverCallback deliverCallback2 = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] 队列2接收到 '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
            };
            channel.basicConsume(queueName2, true, deliverCallback2, consumerTag -> { });
        }
    }
    
    • 如上,一个 queue 可以绑定多个 routingKey。

主题模式

  1. 生产者:
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class EmitLogTopic {
    
        private static final String EXCHANGE_NAME = "topic_logs";
    
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            try (Connection connection = factory.newConnection();
                Channel channel = connection.createChannel()) {
    
                channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
    
                String[] routingKeys = new String[]{"quick.orange.rabbit", 
    												"lazy.orange.elephant", 
    												"quick.orange.fox", 
    												"lazy.brown.fox", 
    												"quick.brown.fox", 
    												"quick.orange.male.rabbit", 
    												"lazy.orange.male.rabbit"};
                for(String severity : routingKeys){
                    String message = "From " + severity + " routingKey' s message!";
                    channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
                    System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
                }
            }
        }
        //..
    }
    
  2. 消费者:
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DeliverCallback;
    
    public class ReceiveLogsTopic {
    
        private static final String EXCHANGE_NAME = "topic_logs";
    
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
            String queueName1 = channel.queueDeclare().getQueue();
            String queueName2 = channel.queueDeclare().getQueue();
    
            String[] routingKeys;
            routingKeys = new String[]{"*.orange.*"};
            for (String bindingKey : routingKeys) {
                channel.queueBind(queueName1, EXCHANGE_NAME, bindingKey);
            }
            routingKeys = new String[]{"*.*.rabbit", "lazy.#"};
            for (String bindingKey : routingKeys) {
                channel.queueBind(queueName2, EXCHANGE_NAME, bindingKey);
            }
    
            System.out.println(" [*] 等待消息.");
    
            DeliverCallback deliverCallback1 = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
            };
            channel.basicConsume(queueName1, true, deliverCallback1, consumerTag -> { });
            
            DeliverCallback deliverCallback2 = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
            };
            channel.basicConsume(queueName2, true, deliverCallback2, consumerTag -> { });
        }
    }
    

标题文字

  1. 生产者:
  2. 消费者:


  1. 生产者:
  2. 消费者:


  1. 生产者:
  2. 消费者: