RabbitMQ:Java(工作模式)

来自Wikioe
跳到导航 跳到搜索


关于

根据不同的工作模式,不一定需要声明 exchange、queue,或 binding:

  1. 简单队列:直接与 queue 交互(使用无名 exchange,即:默认的 direct)。
    • 生产者:声明 queue;
    • 消费者:声明 queue;
      • 生产者、消费者共享队列,所以 queue 名称需相同,且 routingKey 即为 queue名称;
    • 发送消息:指定 routingKey(即:queue名称),而不指定 exchange;
  2. Work模式:(同上)
  3. 发布订阅:消息发送到绑定的每一个队列。
    • 生产者:声明 exchange;
    • 消费者:声明 exchange,声明 queue(临时队列),且 binding(不指定 routingKey);
    • 发送消息:指定 exchange,而不指定 routingKey;(消息发送到每一个队列,所以不需要声明 queue)
  4. 路由模式:根据 routingKey 相等,消息发送到绑定的队列。
    • 生产者:声明 exchange;
    • 消费者:声明 exchange,声明 queue(临时队列),且 binding(指定 routingKey);
    • 发送消息:指定 exchange,指定 routingKey;
  5. 主题模式:根据 routingKey 匹配,消息发送到绑定的队列。
    • 生产者:声明 exchange;
    • 消费者:声明 exchange,声明 queue(临时队列),且 binding(指定 routingKey 通配符);
    • 发送消息:指定 exchange,指定 routingKey;


Hello World

  1. 生产者:
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import java.nio.charset.StandardCharsets;
    
    public class Send {
        private final static String QUEUE_NAME = "hello";
    
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            
            try (Connection connection = factory.newConnection();
                Channel channel = connection.createChannel()) {
                 
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                String message = "Hello World!";
                
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
                System.out.println(" [x] 发送 '" + message + "'");
            }
        }
    }
    
    • 可以使用 try-with-resource 语句,不必再显式地关闭资源;(Connection 和 Channel 都实现 java.io.Closeable)
  2. 消费者:
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DeliverCallback;
    
    public class Recv {
        private final static String QUEUE_NAME = "hello";
    
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            
            Channel channel = connection.createChannel();
    
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println(" [*] 等待消息.");
    
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] 接收 '" + message + "'");
            };
    
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
        }
    }
    
    • 在这里也声明了队列:因为我们可能会在发布服务器之前启动使用者,所以我们希望在尝试使用来自队列的消息之前确保该队列存在
    • 不使用 try-with-resource 语句自动关闭通道和连接:因为我们希望在消费者异步侦听消息到达时,该过程保持活动状态。

Work模式

工作队列是啥?

就是当遇到了运行耗时久的任务,并且还得等待它完成,这个时候就可以使用工作队列,把这个耗时任务发送给别的工人(消费者)进行处理,生产者可以直接得到处理完的情况。

相关特性:

  1. 消息确认:消费者发送通知告诉 MQ 处理完成了;
  2. 消息持久:遇到了宕机后,会把消息给缓存或保存下来,使得下次启动能够不丢失(但不是百分百);
  3. 循环调度:若开着两个或三个消费者的时候,当多个消息要接收,MQ 是会自动循环找下一个,避免一直重复同一个或几个;
  4. 公平派遣:当有两个消费者的时候,若一个消费者一直再累死累活,另外一个逍遥自在,这是不利于效率提升的,故可以通过设置,限制若A忙就找B去。


  1. 生产者:
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.MessageProperties;
    import java.nio.charset.StandardCharsets;
    
    public class NewTask {
        private final static String TASK_QUEUE_NAME = "task_queue";
    
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
    
            try (Connection connection = factory.newConnection();
                Channel channel = connection.createChannel()) {
                
                // 消息持久化(非百分百保证)。前提:队列第一次声明就是持久的,并且在生产者端和消费者端都要声明为true。
                boolean durable = true;
                channel.queueDeclare(TASK_QUEUE_NAME, durable, false, false, null);
    
                for (int i = 0; i <= 5; i++) {
                    String message = "第" + i + "个消息.";
                    
                    // MessageProperties.PERSISTENT_TEXT_PLAIN:将消息标记为持久性,确保即使 RabbitMQ 重新启动,task_queue 队列也不会丢失。
                    channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));
                    System.out.println(" [x] 发送 '" + message + "'");
                }
            }
        }
    }
    
    • 将消息标记为持久性并不能完全保证消息不会丢失
      尽管它告诉 RabbitMQ 将消息保存到磁盘,但 RabbitMQ 接受消息但尚未保存消息的时间窗口仍然很短。而且,RabbitMQ 并不是对每条消息都执行“fsync(2)”:它可能只是保存到缓存中,而不是真正写入磁盘。
      持久性保证不强,但对于简单任务队列来说已经足够了。如果你需要一个更强有力的保证,那么你可以使用“publisher confirms”。
  2. 消费者:
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DeliverCallback;
    
    public class Worker {
    
        private final static String TASK_QUEUE_NAME = "task_queue";
    
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            final Connection connection = factory.newConnection();
            final Channel channel = connection.createChannel();
    		
            // 消息持久化(非百分百保证)。消费者端同发布者一样设置为 true
            boolean durable = true;
            channel.queueDeclare(TASK_QUEUE_NAME, durable, false, false, null);
            System.out.println(" [*] 等待消息.");
    
            // 公平派遣设置:一次仅接受一条未经确认的消息,为了实现公平的安排(其他人闲着就安排其他人)
            channel.basicQos(1);
    
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] 接收 '" + message + "'");
                
                try {
                    doWork(message);
                } catch (InterruptedException e) {
                    e.getMessage();
                } finally {
                    System.out.println(" [x] 结束");
                    
                    // 执行完成,返回确认消息。
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                }
            };
            
            // 消息确认:目的就是为了不丢失消息,使得确保每个消息有执行成功。(网络抖动也不怕,会找别人执行)
            boolean autoAck = false;
            channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
    
        }
    
        private static void doWork(String task) throws InterruptedException {
            for (char ch : task.toCharArray()) {
                if (ch == '.') Thread.sleep(1000);
            }
        }
    }
    


RabbitMQ 查看当前队列有那些消息还未确定:

rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

Timeout: 60.0 seconds ...
Listing queues for vhost / ...
name    messages_ready  messages_unacknowledged
work    0       3
hello   0       0

(messages_ready:等待接收的消息;messages_unacknowledged:已接受但未确认的消息)

发布订阅

fanout 交换非常简单,它只是将接收到的所有消息广播到它知道的所有队列


不同于以上使用具有特定名称的队列(在生产者和消费者之间共享队列时,为队列指定名称非常重要),发布订阅模式中,我们希望听到所有日志消息,而不仅仅是其中的一个子集。我们也只对当前的流消息感兴趣,而不是对旧消息感兴趣。要解决这个问题,我们就需要临时队列

  1. 一个新的、空的,随机名称的队列;
  2. 一旦断开连接,队列就被自动删除。

在 Java客户机中,当我们不向“queueDeclare()”提供任何参数时,将创建一个非持久的、独占的、自动删除的队列,并使用生成的名称:

String queueName = channel.queueDeclare().getQueue();   // queueName包含一个随机队列名称,如:amq.gen-JzTY20BRgKO-hjmuj0wlg


  1. 生产者:
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class EmitLog {
        private static final String EXCHANGE_NAME = "logs";
    
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            
            try (Connection connection = factory.newConnection();
                Channel channel = connection.createChannel()) {
                
                channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
    
                String message = "info: Hello World!";
    
                channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
                System.out.println(" [x] 发送 '" + message + "'");
            }
        }
    }
    
  2. 消费者:
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DeliverCallback;
    
    public class ReceiveLogs {
        private static final String EXCHANGE_NAME = "logs";
    
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
            String queueName = channel.queueDeclare().getQueue();
            channel.queueBind(queueName, EXCHANGE_NAME, "");
    
            System.out.println(" [*] 等待消息.");
    
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] 接收 '" + message + "'");
            };
            channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
        }
    }
    


RabbitMQ 查看绑定关系:

rabbitmqctl list_bindings

路由模式

  1. 生产者:
    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class EmitLogDirect {
    
        private static final String EXCHANGE_NAME = "direct_logs";
    
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            
            try (Connection connection = factory.newConnection();
                 Channel channel = connection.createChannel()) {
    
                channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
                
                String severity = "warning";
                String message = "Hello World!";
                
                channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
                System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
            }
        }
    
    }
    
  2. 消费者:
    import com.rabbitmq.client.*;
    
    public class ReceiveLogsDirect {
    
        private static final String EXCHANGE_NAME = "direct_logs";
    
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
    
            String queueName1 = channel.queueDeclare().getQueue();
            String queueName2 = channel.queueDeclare().getQueue();
    
            String[] severities
            severities = {"info", "warning", "error"};
            for (String severity : severities1) {
                channel.queueBind(queueName1, EXCHANGE_NAME, severity);
            }
            severities = {"info"};
            for (String severity : severities2) {
                channel.queueBind(queueName2, EXCHANGE_NAME, severity);
            }
            System.out.println(" [*] 等待消息.");
    
    
            DeliverCallback deliverCallback1 = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] 队列1接收到 '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
            };
            channel.basicConsume(queueName1, true, deliverCallback1, consumerTag -> { });
    
            DeliverCallback deliverCallback2 = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] 队列2接收到 '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
            };
            channel.basicConsume(queueName2, true, deliverCallback2, consumerTag -> { });
        }
    }
    
    • 如上,一个 queue 可以绑定多个 routingKey。

主题模式

  1. 生产者:
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class EmitLogTopic {
    
        private static final String EXCHANGE_NAME = "topic_logs";
    
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            try (Connection connection = factory.newConnection();
                Channel channel = connection.createChannel()) {
    
                channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
    
                String[] routingKeys = new String[]{"quick.orange.rabbit", 
    												"lazy.orange.elephant", 
    												"quick.orange.fox", 
    												"lazy.brown.fox", 
    												"quick.brown.fox", 
    												"quick.orange.male.rabbit", 
    												"lazy.orange.male.rabbit"};
                for(String severity : routingKeys){
                    String message = "From " + severity + " routingKey' s message!";
                    channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
                    System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
                }
            }
        }
        //..
    }
    
  2. 消费者:
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DeliverCallback;
    
    public class ReceiveLogsTopic {
    
        private static final String EXCHANGE_NAME = "topic_logs";
    
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
            String queueName1 = channel.queueDeclare().getQueue();
            String queueName2 = channel.queueDeclare().getQueue();
    
            String[] routingKeys;
            routingKeys = new String[]{"*.orange.*"};
            for (String bindingKey : routingKeys) {
                channel.queueBind(queueName1, EXCHANGE_NAME, bindingKey);
            }
            routingKeys = new String[]{"*.*.rabbit", "lazy.#"};
            for (String bindingKey : routingKeys) {
                channel.queueBind(queueName2, EXCHANGE_NAME, bindingKey);
            }
    
            System.out.println(" [*] 等待消息.");
    
            DeliverCallback deliverCallback1 = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
            };
            channel.basicConsume(queueName1, true, deliverCallback1, consumerTag -> { });
            
            DeliverCallback deliverCallback2 = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
            };
            channel.basicConsume(queueName2, true, deliverCallback2, consumerTag -> { });
        }
    }
    

Remote procedure call(RPC,远程过程调用)

RPC 模式,关于如何使用工作队列在多个 workers 之间分配耗时的任务。

虽然RPC是计算中非常常见的模式,但它经常受到批评。当程序员不知道某个函数调用是本地调用还是慢速RPC时,就会出现问题。这样的混乱会导致不可预知的系统,并给调试增加不必要的复杂性。与简化软件相反,误用RPC会导致无法维护的意大利面代码。

考虑到这一点,请考虑以下建议:
  确定哪个函数调用是本地的,哪个是远程的。
  记录您的系统。明确组件之间的依赖关系。
  处理错误案例。当RPC服务器长时间停机时,客户端应该如何反应?
  
如有疑问,请避免使用RPC。如果可以的话,应该使用异步管道,而不是类似RPC的阻塞,来将结果异步推送到下一个计算阶段。

以下,将使用 RabbitMQ 构建一个 RPC 系统:一个客户端和一个可伸缩的 RPC 服务器。(因为我们没有任何值得分发的耗时任务,所以我们将创建一个返回斐波那契数的伪 RPC 服务)



回调队列:一般来说,在 RabbitMQ 上执行 RPC 很容易。客户端发送一条请求消息,服务器用一条响应消息进行回复。为了接收响应,我们需要随请求一起发送“回调”队列地址。

我们可以使用默认队列(在Java客户机中是独占的)。
callbackQueueName = channel.queueDeclare().getQueue();

BasicProperties props = new BasicProperties
                            .Builder()
                            .replyTo(callbackQueueName)
                            .build();

channel.basicPublish("", "rpc_queue", props, message.getBytes());
  • 以上内容需要一个新的导入:
    import com.rabbitmq.client.AMQP.BasicProperties;
    
  • 消息属性:AMQP 0-9-1协议预定义了消息附带的14个属性集。 除以下内容外,大多数属性很少使用:
    • “deliveryMode”:将消息标记为持久(值为2)或暂时(任何其他值)。
    • “contentType”:用于描述编码的mime类型。
    • replyTo”:通常用于命名回调队列。
    • correlationId”:用于将 RPC 响应与请求关联起来。



RabbitMQ:RPC示例.png
RabbitMQ:RPC示例 1.png

我们的 RPC 将像这样工作:

  1. 对于 RPC 请求,客户端发送一条具有两个属性的消息:replyTo(设置为仅为该请求创建的匿名排他队列)和 correlationId(设置为每个请求的唯一值)。
  2. 该请求被发送到 rpc_queue 队列。
  3. RPC 工作程序(即:服务器)正在等待该队列上的请求:出现请求时,它完成工作,并会使用 replyTo 字段中的队列来并将带有结果的消息发送回客户端。
  4. 客户机等待应答队列上的数据:出现消息时,它会检查 correlationId 属性。 如果它与请求中的值匹配,则将响应返回给应用程序。


  1. 服务器代码非常简单:
    • 同以前示例一样,从建立 connection,channel 和声明 queue 开始。
    • 我们可能要运行多个服务器进程:为了将负载平均分配到多个服务器上,我们需要在 channel.basicQos中 设置 prefetchCount 设置。
    • 我们使用 basicConsume 访问队列,在队列中我们以对象(DeliverCallback)的形式提供回调,该回调将完成工作并将响应发送回去。
    import com.rabbitmq.client.*;
    
    public class RPCServer {
    
        private static final String RPC_QUEUE_NAME = "rpc_queue";
    
        private static int fib(int n) {
            if (n == 0) return 0;
            if (n == 1) return 1;
            return fib(n - 1) + fib(n - 2);
        }
    
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
    
            try (Connection connection = factory.newConnection();
                 Channel channel = connection.createChannel()) {
                channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
                channel.queuePurge(RPC_QUEUE_NAME);
    
                channel.basicQos(1);
    
                System.out.println(" [x] Awaiting RPC requests");
    
                Object monitor = new Object();
                DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                    AMQP.BasicProperties replyProps = new AMQP.BasicProperties
                            .Builder()
                            .correlationId(delivery.getProperties().getCorrelationId())
                            .build();
    
                    String response = "";
    
                    try {
                        String message = new String(delivery.getBody(), "UTF-8");
                        int n = Integer.parseInt(message);
    
                        System.out.println(" [.] fib(" + message + ")");
                        response += fib(n);
                    } catch (RuntimeException e) {
                        System.out.println(" [.] " + e.toString());
                    } finally {
                        channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8"));
                        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                        // RabbitMq consumer worker thread notifies the RPC server owner thread
                        synchronized (monitor) {
                            monitor.notify();
                        }
                    }
                };
    
                channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, (consumerTag -> { }));
                // Wait and be prepared to consume the message from RPC client.
                while (true) {
                    synchronized (monitor) {
                        try {
                            monitor.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        }
    }
    
  2. 客户机代码稍微复杂一些:
    • 建立 connection 和 channel。
    • call 方法发出实际的 RPC 请求。
    • 在这里,我们首先生成一个唯一的 correlationId 号并保存它—我们的使用者回调将使用这个值来匹配相应的响应。
    • 然后,我们为回复创建一个专用的独占 queue 并订阅它。
    • 接下来,我们发布请求消息,其中包含两个属性:replyTocorrelationId
    • 此时,我们可以等待直到正确的响应到达。
    • 因为我们的消费者传递处理是在一个单独的线程中进行的,所以在响应到达之前,我们需要一些东西来挂起主线程。使用 BlockingQueue 是一种可能的解决方案。在这里,我们正在创建容量设置为 1 的 ArrayBlockingQueue,因为我们只需要等待一个响应。
    • 消费者正在做一项非常简单的工作,对于每个消费的响应消息,它都会检查 correlationId 是否就是我们要查找的。如果是这样,它将响应放入 BlockingQueue
    • 同时,主线程正在等待响应将其从 BlockingQueue 中取出。
    • 最后,我们将响应返回给用户。
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.util.UUID;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.TimeoutException;
    
    public class RPCClient implements AutoCloseable {
    
        private Connection connection;
        private Channel channel;
        private String requestQueueName = "rpc_queue";
    
        public RPCClient() throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
    
            connection = factory.newConnection();
            channel = connection.createChannel();
        }
    
        public static void main(String[] argv) {
            try (RPCClient fibonacciRpc = new RPCClient()) {
                for (int i = 0; i < 32; i++) {
                    String i_str = Integer.toString(i);
                    System.out.println(" [x] Requesting fib(" + i_str + ")");
                    String response = fibonacciRpc.call(i_str);
                    System.out.println(" [.] Got '" + response + "'");
                }
            } catch (IOException | TimeoutException | InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        public String call(String message) throws IOException, InterruptedException {
            final String corrId = UUID.randomUUID().toString();
    
            String replyQueueName = channel.queueDeclare().getQueue();
            AMQP.BasicProperties props = new AMQP.BasicProperties
                    .Builder()
                    .correlationId(corrId)
                    .replyTo(replyQueueName)
                    .build();
    
            channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
    
            final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
    
            String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {
                if (delivery.getProperties().getCorrelationId().equals(corrId)) {
                    response.offer(new String(delivery.getBody(), "UTF-8"));
                }
            }, consumerTag -> {
            });
    
            String result = response.take();
            channel.basicCancel(ctag);
            return result;
        }
    
        public void close() throws IOException {
            connection.close();
        }
    }
    


以上只是 RabbitMQ 中 RPC 服务的一个实现,你也可以根据业务需要实现更多。

  • rpc有一个优点,如果一个RPC服务器处理不来,可以再增加一个、两个、三个。


上述代码还比较简单,还有很多问题没有解决:

  • 如果没有发现服务器,客户端如何处理?
  • 如果客户端的 RPC 请求超时了怎么办?
  • 如果服务器出现了故障,发生了异常,是否将异常发送到客户端?
  • 在处理消息前,怎样防止无效的消息?检查范围、类型?