查看“RabbitMQ:Java(工作模式)”的源代码
←
RabbitMQ:Java(工作模式)
跳到导航
跳到搜索
因为以下原因,您没有权限编辑本页:
您请求的操作仅限属于该用户组的用户执行:
用户
您可以查看和复制此页面的源代码。
[[category:RabbitMQ]] [[category:Java]] == '''关于''' == 根据不同的工作模式,不一定需要声明 exchange、queue,或 binding: # '''简单队列''':直接与 queue 交互(使用无名 exchange,即:默认的 direct)。 #* 生产者:声明 queue; #* 消费者:声明 queue; #** 生产者、消费者'''共享队列''',所以 queue 名称需相同,且 routingKey 即为 queue名称; #* 发送消息:指定 routingKey(即:queue名称),而不指定 exchange; # '''Work模式''':(同上) # '''发布订阅''':消息发送到绑定的每一个队列。 #* 生产者:声明 exchange; #* 消费者:声明 exchange,声明 queue('''临时队列'''),且 binding(不指定 routingKey); #* 发送消息:指定 exchange,而不指定 routingKey;(消息发送到每一个队列,所以不需要声明 queue) # '''路由模式''':根据 routingKey '''相等''',消息发送到绑定的队列。 #* 生产者:声明 exchange; #* 消费者:声明 exchange,声明 queue('''临时队列'''),且 binding(指定 routingKey); #* 发送消息:指定 exchange,指定 routingKey; # '''主题模式''':根据 routingKey '''匹配''',消息发送到绑定的队列。 #* 生产者:声明 exchange; #* 消费者:声明 exchange,声明 queue('''临时队列'''),且 binding(指定 routingKey 通配符); #* 发送消息:指定 exchange,指定 routingKey; == Hello World == # 生产者: #: <syntaxhighlight lang="Java" highlight=""> import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.nio.charset.StandardCharsets; public class Send { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8)); System.out.println(" [x] 发送 '" + message + "'"); } } } </syntaxhighlight> #* 可以使用 try-with-resource 语句,不必再显式地关闭资源;(Connection 和 Channel 都实现 java.io.Closeable) # 消费者: #: <syntaxhighlight lang="Java" highlight=""> import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; public class Recv { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] 等待消息."); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] 接收 '" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } } </syntaxhighlight> #* 在这里也声明了队列:因为我们可能会在发布服务器之前启动使用者,所以我们希望在尝试'''使用来自队列的消息之前确保该队列存在'''。 #* 不使用 try-with-resource 语句自动关闭通道和连接:因为我们希望在消费者异步侦听消息到达时,该过程保持活动状态。 == Work模式 == <pre> 工作队列是啥? 就是当遇到了运行耗时久的任务,并且还得等待它完成,这个时候就可以使用工作队列,把这个耗时任务发送给别的工人(消费者)进行处理,生产者可以直接得到处理完的情况。 </pre> 相关特性: # '''消息确认''':消费者发送通知告诉 MQ 处理完成了; # '''消息持久''':遇到了宕机后,会把消息给缓存或保存下来,使得下次启动能够不丢失(但不是百分百); # '''循环调度''':若开着两个或三个消费者的时候,当多个消息要接收,MQ 是会自动循环找下一个,避免一直重复同一个或几个; # '''公平派遣''':当有两个消费者的时候,若一个消费者一直再累死累活,另外一个逍遥自在,这是不利于效率提升的,故可以通过设置,限制若A忙就找B去。 # 生产者: #: <syntaxhighlight lang="Java" highlight=""> import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; import java.nio.charset.StandardCharsets; public class NewTask { private final static String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { // 消息持久化(非百分百保证)。前提:队列第一次声明就是持久的,并且在生产者端和消费者端都要声明为true。 boolean durable = true; channel.queueDeclare(TASK_QUEUE_NAME, durable, false, false, null); for (int i = 0; i <= 5; i++) { String message = "第" + i + "个消息."; // MessageProperties.PERSISTENT_TEXT_PLAIN:将消息标记为持久性,确保即使 RabbitMQ 重新启动,task_queue 队列也不会丢失。 channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8)); System.out.println(" [x] 发送 '" + message + "'"); } } } } </syntaxhighlight> #* 将消息标记为持久性并'''不能完全保证消息不会丢失''': #*: 尽管它告诉 RabbitMQ 将消息保存到磁盘,但 RabbitMQ 接受消息但尚未保存消息的时间窗口仍然很短。而且,RabbitMQ 并不是对每条消息都执行“fsync(2)”:它可能只是保存到缓存中,而不是真正写入磁盘。 #*: 持久性保证不强,但对于简单任务队列来说已经足够了。如果你需要一个更强有力的保证,那么你可以使用“'''publisher confirms'''”。 # 消费者: #: <syntaxhighlight lang="Java" highlight=""> import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; public class Worker { private final static String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); // 消息持久化(非百分百保证)。消费者端同发布者一样设置为 true boolean durable = true; channel.queueDeclare(TASK_QUEUE_NAME, durable, false, false, null); System.out.println(" [*] 等待消息."); // 公平派遣设置:一次仅接受一条未经确认的消息,为了实现公平的安排(其他人闲着就安排其他人) channel.basicQos(1); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] 接收 '" + message + "'"); try { doWork(message); } catch (InterruptedException e) { e.getMessage(); } finally { System.out.println(" [x] 结束"); // 执行完成,返回确认消息。 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; // 消息确认:目的就是为了不丢失消息,使得确保每个消息有执行成功。(网络抖动也不怕,会找别人执行) boolean autoAck = false; channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { }); } private static void doWork(String task) throws InterruptedException { for (char ch : task.toCharArray()) { if (ch == '.') Thread.sleep(1000); } } } </syntaxhighlight> RabbitMQ 查看当前队列有那些消息还未确定: <syntaxhighlight lang="bash" highlight=""> rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged Timeout: 60.0 seconds ... Listing queues for vhost / ... name messages_ready messages_unacknowledged work 0 3 hello 0 0 </syntaxhighlight> (messages_ready:等待接收的消息;messages_unacknowledged:已接受但未确认的消息) == 发布订阅 == fanout 交换非常简单,它只是'''将接收到的所有消息广播到它知道的所有队列'''。 不同于以上使用具有特定名称的队列(在生产者和消费者之间共享队列时,为队列指定名称非常重要),发布订阅模式中,我们希望听到所有日志消息,而不仅仅是其中的一个子集。我们也只对当前的流消息感兴趣,而不是对旧消息感兴趣。要解决这个问题,我们就需要'''临时队列''': # 一个新的、空的,随机名称的队列; # 一旦断开连接,队列就被自动删除。 在 Java客户机中,当我们不向“queueDeclare()”提供任何参数时,将创建一个非持久的、独占的、自动删除的队列,并使用生成的名称: <syntaxhighlight lang="Java" highlight=""> String queueName = channel.queueDeclare().getQueue(); // queueName包含一个随机队列名称,如:amq.gen-JzTY20BRgKO-hjmuj0wlg </syntaxhighlight> # 生产者: #: <syntaxhighlight lang="Java" highlight=""> import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class EmitLog { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); String message = "info: Hello World!"; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8")); System.out.println(" [x] 发送 '" + message + "'"); } } } </syntaxhighlight> # 消费者: #: <syntaxhighlight lang="Java" highlight=""> import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; public class ReceiveLogs { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] 等待消息."); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] 接收 '" + message + "'"); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } } </syntaxhighlight> RabbitMQ 查看绑定关系: <syntaxhighlight lang="bash" highlight=""> rabbitmqctl list_bindings </syntaxhighlight> == 路由模式 == # 生产者: #: <syntaxhighlight lang="Java" highlight=""> import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class EmitLogDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); String severity = "warning"; String message = "Hello World!"; channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + severity + "':'" + message + "'"); } } } </syntaxhighlight> # 消费者: #: <syntaxhighlight lang="Java" highlight=""> import com.rabbitmq.client.*; public class ReceiveLogsDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); String queueName1 = channel.queueDeclare().getQueue(); String queueName2 = channel.queueDeclare().getQueue(); String[] severities; severities = {"info", "warning", "error"}; for (String severity : severities1) { channel.queueBind(queueName1, EXCHANGE_NAME, severity); } severities = {"info"}; for (String severity : severities2) { channel.queueBind(queueName2, EXCHANGE_NAME, severity); } System.out.println(" [*] 等待消息."); DeliverCallback deliverCallback1 = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] 队列1接收到 '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; channel.basicConsume(queueName1, true, deliverCallback1, consumerTag -> { }); DeliverCallback deliverCallback2 = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] 队列2接收到 '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; channel.basicConsume(queueName2, true, deliverCallback2, consumerTag -> { }); } } </syntaxhighlight> #* 如上,一个 queue 可以绑定多个 routingKey。 == 主题模式 == # 生产者: #: <syntaxhighlight lang="Java" highlight=""> import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class EmitLogTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); String[] routingKeys = new String[]{"quick.orange.rabbit", "lazy.orange.elephant", "quick.orange.fox", "lazy.brown.fox", "quick.brown.fox", "quick.orange.male.rabbit", "lazy.orange.male.rabbit"}; for(String severity : routingKeys){ String message = "From " + severity + " routingKey' s message!"; channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'"); } } } //.. } </syntaxhighlight> # 消费者: #: <syntaxhighlight lang="Java" highlight=""> import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; public class ReceiveLogsTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); String queueName1 = channel.queueDeclare().getQueue(); String queueName2 = channel.queueDeclare().getQueue(); String[] routingKeys; routingKeys = new String[]{"*.orange.*"}; for (String bindingKey : routingKeys) { channel.queueBind(queueName1, EXCHANGE_NAME, bindingKey); } routingKeys = new String[]{"*.*.rabbit", "lazy.#"}; for (String bindingKey : routingKeys) { channel.queueBind(queueName2, EXCHANGE_NAME, bindingKey); } System.out.println(" [*] 等待消息."); DeliverCallback deliverCallback1 = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; channel.basicConsume(queueName1, true, deliverCallback1, consumerTag -> { }); DeliverCallback deliverCallback2 = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; channel.basicConsume(queueName2, true, deliverCallback2, consumerTag -> { }); } } </syntaxhighlight> == Remote procedure call(RPC,远程过程调用) == RPC 模式,关于如何使用'''工作队列'''在多个 workers 之间分配耗时的任务。 <pre> 虽然RPC是计算中非常常见的模式,但它经常受到批评。当程序员不知道某个函数调用是本地调用还是慢速RPC时,就会出现问题。这样的混乱会导致不可预知的系统,并给调试增加不必要的复杂性。与简化软件相反,误用RPC会导致无法维护的意大利面代码。 考虑到这一点,请考虑以下建议: 确定哪个函数调用是本地的,哪个是远程的。 记录您的系统。明确组件之间的依赖关系。 处理错误案例。当RPC服务器长时间停机时,客户端应该如何反应? 如有疑问,请避免使用RPC。如果可以的话,应该使用异步管道,而不是类似RPC的阻塞,来将结果异步推送到下一个计算阶段。 </pre> 以下,将使用 RabbitMQ 构建一个 RPC 系统:一个客户端和一个可伸缩的 RPC 服务器。(因为我们没有任何值得分发的耗时任务,所以我们将创建一个返回斐波那契数的伪 RPC 服务) '''回调队列''':一般来说,在 RabbitMQ 上执行 RPC 很容易。客户端发送一条请求消息,服务器用一条响应消息进行回复。为了接收响应,我们需要随请求一起发送“回调”队列地址。 : 我们可以使用默认队列(在Java客户机中是独占的)。 : <syntaxhighlight lang="Java" highlight=""> callbackQueueName = channel.queueDeclare().getQueue(); BasicProperties props = new BasicProperties .Builder() .replyTo(callbackQueueName) .build(); channel.basicPublish("", "rpc_queue", props, message.getBytes()); </syntaxhighlight> * 以上内容需要一个新的导入: *: <syntaxhighlight lang="Java" highlight=""> import com.rabbitmq.client.AMQP.BasicProperties; </syntaxhighlight> * '''消息属性''':AMQP 0-9-1协议预定义了消息附带的14个属性集。 除以下内容外,大多数属性很少使用: ** “deliveryMode”:将消息标记为持久(值为2)或暂时(任何其他值)。 ** “contentType”:用于描述编码的mime类型。 ** “'''replyTo'''”:通常用于命名回调队列。 ** “'''correlationId'''”:用于将 RPC 响应与请求关联起来。 : [[File:RabbitMQ:RPC示例.png|600px]] : [[File:RabbitMQ:RPC示例_1.png|600px]] 我们的 RPC 将像这样工作: # 对于 RPC 请求,客户端发送一条具有两个属性的消息:'''replyTo'''(设置为仅为该请求创建的匿名排他队列)和 '''correlationId'''(设置为每个请求的唯一值)。 # 该请求被发送到 '''rpc_queue''' 队列。 # RPC 工作程序(即:服务器)正在等待该队列上的请求:出现请求时,它完成工作,并会使用 '''replyTo''' 字段中的队列来并将带有结果的消息发送回客户端。 # 客户机等待应答队列上的数据:出现消息时,它会检查 '''correlationId''' 属性。 如果它与请求中的值匹配,则将响应返回给应用程序。 # 服务器代码非常简单: #* 同以前示例一样,从建立 connection,channel 和声明 queue 开始。 #* 我们可能要运行多个服务器进程:为了将负载平均分配到多个服务器上,我们需要在 channel.basicQos中 设置 prefetchCount 设置。 #* 我们使用 basicConsume 访问队列,在队列中我们以对象(DeliverCallback)的形式提供回调,该回调将完成工作并将响应发送回去。 #: <syntaxhighlight lang="Java" highlight=""> import com.rabbitmq.client.*; public class RPCServer { private static final String RPC_QUEUE_NAME = "rpc_queue"; private static int fib(int n) { if (n == 0) return 0; if (n == 1) return 1; return fib(n - 1) + fib(n - 2); } public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); channel.queuePurge(RPC_QUEUE_NAME); channel.basicQos(1); System.out.println(" [x] Awaiting RPC requests"); Object monitor = new Object(); DeliverCallback deliverCallback = (consumerTag, delivery) -> { AMQP.BasicProperties replyProps = new AMQP.BasicProperties .Builder() .correlationId(delivery.getProperties().getCorrelationId()) .build(); String response = ""; try { String message = new String(delivery.getBody(), "UTF-8"); int n = Integer.parseInt(message); System.out.println(" [.] fib(" + message + ")"); response += fib(n); } catch (RuntimeException e) { System.out.println(" [.] " + e.toString()); } finally { channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8")); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); // RabbitMq consumer worker thread notifies the RPC server owner thread synchronized (monitor) { monitor.notify(); } } }; channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, (consumerTag -> { })); // Wait and be prepared to consume the message from RPC client. while (true) { synchronized (monitor) { try { monitor.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } } } </syntaxhighlight> # 客户机代码稍微复杂一些: #* 建立 connection 和 channel。 #* call 方法发出实际的 RPC 请求。 #* 在这里,我们首先生成一个唯一的 '''correlationId''' 号并保存它—我们的使用者回调将使用这个值来匹配相应的响应。 #* 然后,我们为回复创建一个专用的'''独占 queue''' 并订阅它。 #* 接下来,我们发布请求消息,其中包含两个属性:'''replyTo''' 和 '''correlationId'''。 #* 此时,我们可以等待直到正确的响应到达。 #* 因为我们的消费者传递处理是在一个单独的线程中进行的,所以在响应到达之前,我们需要一些东西来挂起主线程。使用 BlockingQueue 是一种可能的解决方案。在这里,我们正在创建容量设置为 1 的 '''ArrayBlockingQueue''',因为我们只需要等待一个响应。 #* 消费者正在做一项非常简单的工作,对于每个消费的响应消息,它都会检查 '''correlationId''' 是否就是我们要查找的。如果是这样,它将响应放入 '''BlockingQueue'''。 #* 同时,主线程正在等待响应将其从 '''BlockingQueue''' 中取出。 #* 最后,我们将响应返回给用户。 #: <syntaxhighlight lang="Java" highlight=""> import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeoutException; public class RPCClient implements AutoCloseable { private Connection connection; private Channel channel; private String requestQueueName = "rpc_queue"; public RPCClient() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); connection = factory.newConnection(); channel = connection.createChannel(); } public static void main(String[] argv) { try (RPCClient fibonacciRpc = new RPCClient()) { for (int i = 0; i < 32; i++) { String i_str = Integer.toString(i); System.out.println(" [x] Requesting fib(" + i_str + ")"); String response = fibonacciRpc.call(i_str); System.out.println(" [.] Got '" + response + "'"); } } catch (IOException | TimeoutException | InterruptedException e) { e.printStackTrace(); } } public String call(String message) throws IOException, InterruptedException { final String corrId = UUID.randomUUID().toString(); String replyQueueName = channel.queueDeclare().getQueue(); AMQP.BasicProperties props = new AMQP.BasicProperties .Builder() .correlationId(corrId) .replyTo(replyQueueName) .build(); channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8")); final BlockingQueue<String> response = new ArrayBlockingQueue<>(1); String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> { if (delivery.getProperties().getCorrelationId().equals(corrId)) { response.offer(new String(delivery.getBody(), "UTF-8")); } }, consumerTag -> { }); String result = response.take(); channel.basicCancel(ctag); return result; } public void close() throws IOException { connection.close(); } } </syntaxhighlight> 以上只是 RabbitMQ 中 RPC 服务的一个实现,你也可以根据业务需要实现更多。 * rpc有一个优点,如果一个RPC服务器处理不来,可以再增加一个、两个、三个。 上述代码还比较简单,还有很多问题没有解决: * 如果没有发现服务器,客户端如何处理? * 如果客户端的 RPC 请求超时了怎么办? * 如果服务器出现了故障,发生了异常,是否将异常发送到客户端? * 在处理消息前,怎样防止无效的消息?检查范围、类型?
返回至“
RabbitMQ:Java(工作模式)
”。
导航菜单
个人工具
登录
命名空间
页面
讨论
大陆简体
已展开
已折叠
查看
阅读
查看源代码
查看历史
更多
已展开
已折叠
搜索
导航
首页
最近更改
随机页面
MediaWiki帮助
笔记
服务器
数据库
后端
前端
工具
《To do list》
日常
阅读
电影
摄影
其他
Software
Windows
WIKIOE
所有分类
所有页面
侧边栏
站点日志
工具
链入页面
相关更改
特殊页面
页面信息