RabbitMQ:Consumer Acknowledgements and Publisher Confirms
关于
【以下内容来自官网:Consumer Acknowledgements and Publisher Confirms】
根据定义,使用消息传递代理(如RabbitMQ)的系统是分布式的。由于发送的协议方法(消息)不能保证到达对等方或由对等方成功处理,因此发布者和使用者都需要一种传递和处理确认机制:
- RabbitMQ 支持的几个消息传递协议提供了这样的特性。(本指南介绍了 AMQP 0-9-1 中的特性,但在其他受支持的协议中,其思想基本相同)
- 从消费者到 RabbitMQ 的交付(delivery)处理确认在消息传递协议中称为确认(Consumer Acknowledgements);
- 代理对发布者的确认是一个称为发布者确认(Publisher Confirms)的协议扩展。
- 这两个特性都基于相同的思想,并受到 TCP 的启发。
它们对于“从发布者到 RabbitMQ 节点”以及“从 RabbitMQ 节点到消费者”的可靠传递都是必不可少的。换句话说,它们对于数据安全至关重要,应用程序和 RabbitMQ 节点对数据安全负有同样的责任。
(消费者)交付确认
当 RabbitMQ 向使用者传递消息时,它需要知道何时考虑成功发送消息。什么样的逻辑是最优的取决于系统。因此,这主要是一个应用决策。
- 在 AMQP 0-9-1 中,当使用 basic.consume 方法注册消费者或使用 basic.get 方法按需获取消息时,会进行此操作。
传递标识符:“delivery tag”
注册消费者(订阅)时,RabbitMQ 将使用 basic.deliver 方法传递(推送)消息。【???】
该方法携带一个 delivery tag(传递标记):它唯一地标识通道上的传递。
- delivery tag 是单调的正整数,并通过客户端库呈现。 确认交付的客户端库方法将传递标记作为参数。
- delivery tag 的作用域是每个通道。
- 所以必须在接收它们的同一通道上进行确认。在其他通道上确认将导致“unknown delivery tag”协议异常并关闭通道。
消费者确认模式 和 数据安全注意事项
当节点将消息传递给消费者时,它必须决定该消息是否应该被认为是由使用者处理(或至少是接收)的。由于多个事物(客户端连接、消费者应用程序等)可能会失败,因此此决策是一个数据安全问题。
消息传递协议通常提供一种确认机制,允许使用者确认到他们所连接的节点的传递。
- 是否使用该机制在消费者订阅时决定。
根据所使用的确认模式,Rabbit MQ可以考虑消息在发出(写入TCP套接字)后立即成功传递,或者在收到显式(“手动”)客户端确认时成功传递:
- 手动确认模式:手动发送的确认可以是肯定的或否定的,并使用以下协议方法之一:
- basic.ack:用于肯定确认;
- basic.reject:用于否定确认,但与 basic.nack 相比有一个限制(没有“multiple”字段);
- basic.nack:用于否定确认;(注意:这是对AMQP 0-9-1的RabbitMQ扩展)
- 肯定确认只是指示 RabbitMQ 将消息记录为已传递,并且可以丢弃。带有 basic.reject 的否定确认具有相同的效果。
- 区别主要在语义上:肯定的确认假设消息已成功处理,而否定的确认则表示未处理传递,但仍应删除。
- 用于传递确认的 API 方法通常公开为客户端库中通道上的操作。
- 自动确认模式:消息在发送后立即被视为成功传递。
- 这种模式会为了更高的吞吐量(只要用户能够跟上),而降低交付和用户处理的安全性。这种模式通常被称为“fire-and-forget”。
- 与手动确认模型不同,如果在成功传递之前关闭了用户的TCP连接或通道,则服务器发送的消息将丢失。因此,自动消息确认应该被认为是不安全的,并且不适合所有的工作负载。
- 在使用自动确认模式时需要考虑的另一件事是消费者过载:
- 手动确认模式通常与有界信道预取的使用,这限制了通道上的未完成的数量(“正在进行”)。但是,通过自动确认,定义没有此类限制。因此,消费者可以通过交付速度来淹没,潜在地累积存储器中的积压并耗尽堆或者通过操作系统终止他们的进程。某些客户端库将应用 TCP 背负压力(停止从套接字读取,直到未处理的交付的积压超过一定限度)。因此,自动确认模式仅适用于可以有效地且以稳定的速率处理交付的消费者。
肯定确认交付
Java客户端用户将使用 Channel#basicAck
和 Channel#basicNack
分别执行 basic.ack 和 basic.nack。
下面是一个Java客户机示例,演示了一个肯定的确认:
boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "a-consumer-tag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException
{
long deliveryTag = envelope.getDeliveryTag();
// positively acknowledge a single delivery, the message will be discarded
channel.basicAck(deliveryTag, false);
}
});
一次确认多个交付
手动确认可以成批处理以减少网络流量。
通过将确认方法(肯定或否定确认)的“multiple”字段(见上文)设置为 true ,RabbitMQ 将确认所有未完成的交付标记,包括确认中指定的标记。
- 作用域是通道。
- 注意,basic.reject 在历史上没有该字段,这就是为什么 basic.nack 被 RabbitMQ 作为协议扩展引入的原因。
例如:假设信道 Ch 上存在未确认的 delivery_tag 为 5、6、7 和 8 ,当确认帧到达该信道时,delivery_tag 设置为 8 并且 multiple 设置为 true,从 5 到 8 的所有标签都将被确认。如果将 multiple 设置为 false,则传递 5、6 和 7 仍然未确认。
boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "a-consumer-tag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException
{
long deliveryTag = envelope.getDeliveryTag();
// positively acknowledge all deliveries up to this delivery tag
channel.basicAck(deliveryTag, true);
}
});
否定确认 和 交付重新排队
有时消费者无法立即处理交付,但其他实例可能可以。在这种情况下,可能需要重新对其进行排队,并让另一个消费者接收和处理它。
- 否定确认:basic.reject 和 basic.nack 是用于此的两种协议方法。
- 重新排队:由“requeue”字段控制:设置为true时,代理将使用指定的 delivery tag 对交付(或多个交付)进行重新排队。
Java 客户端用户将使用Channel#basicject
和Channel#basicNack
分别执行 basic.reject 和 basic.nack:
- 否定确认,并丢弃消息:
boolean autoAck = false; channel.basicConsume(queueName, autoAck, "a-consumer-tag", new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { long deliveryTag = envelope.getDeliveryTag(); // 否定确认, 消息将会被丢弃 channel.basicReject(deliveryTag, false); } });
- 否定确认,并重新排序:
boolean autoAck = false; channel.basicConsume(queueName, autoAck, "a-consumer-tag", new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { long deliveryTag = envelope.getDeliveryTag(); // 否定确认, 消息将会被重新排队 channel.basicReject(deliveryTag, true); } });
- 批量否定确认并重新排队:basic.nack方法接受一个附加参数 multiple,用于一次拒绝或重新请求多条消息:
- 这就是它与 basic.reject 的区别。
boolean autoAck = false; channel.basicConsume(queueName, autoAck, "a-consumer-tag", new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { long deliveryTag = envelope.getDeliveryTag(); // 对这个 deliveryTag 的所有 delivery 重新排队(批量否定确认并重新排队) channel.basicNack(deliveryTag, true, true); } });
重新排序的位置,以及带来的问题和解决:
- 当一条消息被重新排队时,如果可能的话,它将被放在其队列中的原始位置。否则(由于多个使用者共享一个队列时并发传递和来自其他使用者的确认),则消息将被重新请求到更靠近队列头的位置。
- Requeed 消息可以立即准备好重新交付:具体取决于它们在队列中的位置和具有活动消费者的通道使用的“预取(prefetch)”值。这意味着,如果所有的消费者因为暂时的情况而不能处理交付而重新排队,他们将创建一个“重新排队/重新交付”的循环,这样的循环在网络带宽和 CPU 资源方面可能代价高昂。
- 消费者实现可以跟踪重新获取的数量,并永久拒绝消息(丢弃它们)或在延迟之后安排重新安排。
通道预取设置(QoS)
因为消息是异步发送(推送)到客户机的,所以在任何给定的时刻,通道上通常有多条消息“正在传输”。此外,来自客户端的手动确认本质上也是异步的。所以有一个传送标签的滑动窗口是未确认的。开发人员通常倾向于限制此窗口的大小,以避免消费者端的“无限缓冲区问题”。
这是通过使用 basic.qos 方法设置“预取计数”值来实现的:该值定义通道上允许的最大未确认传递数。一旦数量达到配置的计数,RabbitMQ 将停止在通道上传递更多消息,除非至少有一个未完成的消息得到确认(值 0 被视为无限大,允许任意数量的未确认消息)。
- 例如,假设信道 Ch 上有未确认的传递标签 5、6、7 和 8,并且信道 Ch 的预取计数设置为 4,则 RabbitMQ 不会在 Ch 上推送任何更多的传递,除非至少有一个未完成的传递被确认。当确认帧到达该通道时,delivery tag 设置为5(或6、7或8),RabbitMQ 将注意到并传递另一条消息。
- 一次确认多条消息将使多条消息可供传递。
- 如果在传输中已经有传递的情况下更改了预取值,则会出现自然竞争条件,并且在一个通道上可能暂时有超过预取计数的未确认消息。
预取:“每个通道”、“每个用户”和“全局”
可以为特定信道或特定消费者配置QoS设置。
预取和轮询消费者
QoS 预取设置对使用 basic.get(“pull API”)获取的消息没有影响,即使在手动确认模式下也是如此。
消费者确认模式、预取和吞吐量
确认模式和QoS预取值对用户吞吐量有显著影响。一般来说,:
- 增加预将提高向用户传递消息的速率。
- 自动确认模式产生最佳的传送速率。
但是,在这两种情况下,已传递但尚未处理的消息的数量也将增加,从而增加消费者 RAM 的消耗。
应小心使用带有无限预取的自动确认模式或手动确认模式:消费大量消息而不进行确认的消费者将导致他们所连接的节点上的内存消耗增长。找到一个合适的预取值是一个反复试验的问题,并且会因工作负载而异。
- 在 100 到 300 范围内的值通常提供最佳吞吐量,并且不会有压倒消费者的重大风险。更高的值通常常常陷入递减递减的规律。
- 预取值 1 是最保守的。它将显著降低吞吐量,特别是在使用者连接延迟很高的环境中。对于许多应用程序,较高的值将是适当和最佳的。
当用户失败或失去连接时:自动重新排队
当使用手动确认时,当发生传递的通道(或连接)关闭时,任何未确认的传递(消息)将自动重新发出。
- 这包括客户端的 TCP 连接丢失、使用者应用程序(进程)失败和通道级协议异常(如下所述)。
- 注意,检测不可用的客户端需要一段时间。
由于这种行为,消费者必须准备好处理重新交付,否则在实现时必须考虑幂等性。Redeliveries 将有一个特殊的布尔属性 redeliver,由 RabbitMQ 设置为 true。对于首次交付,它将被设置为false。【???】
- 注意,使用者可以接收以前传递给另一个使用者的消息。
客户端错误:双重确认和未知标记
- 如果客户端多次确认同一个传递标记,RabbitMQ 将导致一个通道错误,例如
PRECONDITION\u FAILED-unknown delivery tag 100
。 - 如果使用未知传递标记,则会引发相同的通道异常。
- 在另一种情况下,代理会抱怨“unknown delivery tag”,即在与接收传递的通道不同的通道上尝试确认(无论是肯定的还是否定的)。
- 必须在同一 channel 上确认交付。
发布者确认
【见:“RabbitMQ:Publisher Confirms实现”】
网络可能以不太明显的方式出现故障,检测某些故障需要时间。因此,将协议帧或一组帧(例如,已发布的消息)写入其套接字的客户端不能假定消息已到达服务器并已成功处理。它可能会在途中丢失,也可能会严重延迟交货。
使用标准的 AMQP 0-9-1,确保消息不会丢失的唯一方法是使用事务——使通道具有事务性,然后为每个消息或消息集发布、提交。
- 在这种情况下,事务是不必要的重量级,并将吞吐量降低了250倍。
- 它模仿协议中已经存在的使用者确认机制。
要启用确认,客户端将发送 confirm.select 方法。根据是否设置 no-wait,代理可能会以 confirm.select-ok 响应。
- 一旦在频道上使用 confirm.select 方法,则表示处于确认模式。
- 事务性通道不能置于确认模式,一旦通道处于确认模式,就不能使其成为事务性通道。【???】
一旦通道处于确认模式,代理和客户机都会对消息进行计数(计数从第一次 confirm.select 时的 1 开始)。然后,代理在处理消息时通过在同一通道上发送 basic.ack 来确认消息。delivery tag 字段包含已确认邮件的序列号。代理还可以在 basic.ack 中设置 multiple 字段,以指示已处理到并包括具有序列号的消息的所有消息。
发布者的否定确认
在特殊情况下,当代理无法成功处理消息时,代理将发送 basic.nack,而不是 basic.ack。
- 在此上下文中,basic.nack 的字段与 basic.ack 中相应的字段具有相同的含义,并且应忽略 requeue 字段。
通过 nack'ing 一个或多个消息,代理表明它无法处理这些消息并拒绝对它们负责;此时,客户机可以选择重新发布消息。
通道进入确认模式后,所有随后发布的消息将被 confirmed 或 nack'd 一次。
- 对于消息的确认时间没有任何保证。
- 没有消息会同时被 confirmed 或 nack'd。
只有在负责队列的Erlang进程中发生内部错误时,才会传递 basic.nack。【???】
已发布的消息何时会被代理(broker)确认?
- 对于不可路由的消息:一旦 exchange 验证消息不会路由到任何队列(返回一个空的队列列表),代理将发出一个 confirm。
- 如果消息也被强制发布,则 basic.return 将在 basic.ack 之前发送到客户端。【???】
- 否定应答也是如此(basic.nack)。
- 对于可路由消息,当消息已被所有队列接受时,将发送 basic.ack。
- 对于路由到持久队列的持久消息,这意味着持久到磁盘。
- 对于镜像队列,这意味着所有镜像都已接受该消息。
持久消息的 ACK 延迟
路由到持久队列的持久消息,其 basic.ack 将在将消息持久化到磁盘后发送。RabbitMQ 消息存储在一段时间间隔(几百毫秒)后,或者当队列空闲时,将消息批量保存到磁盘,以尽量减少 fsync(2) 调用的次数。
这意味着在恒定负载下,basic.ack 的延迟可以达到几百毫秒。
为了提高吞吐量,强烈建议应用程序异步处理确认(作为流)或发布成批消息并等待未完成的确认。
- 具体的API在客户机库之间有所不同。
发布者确认的顺序
在大多数情况下,RabbitMQ将以发布消息的相同顺序向发布者确认消息(这适用于在单个通道上发布的消息)。但是,发布者确认是异步发出的,可以确认单个消息或一组消息。发出确认消息的确切时间取决于消息的传递模式(持久与暂时)以及消息路由到的队列的属性(见上文)。
这就是说,不同的消息可以被认为在不同的时间准备好确认。
这意味着与各自的消息相比,确认可以以不同的顺序到达。在可能的情况下,应用程序不应依赖于确认的顺序。
发布者确认 和 保证交付
如果RabbitMQ节点在将所述消息写入磁盘之前失败,则该节点可能会丢失持久消息。
例如,考虑以下场景:
- 客户机将持久消息发布到持久队列
- 客户机使用队列中的消息(注意消息是持久的,队列是持久的),但确认消息不是活动的,
- 代理节点失败并重新启动,并且
- 客户端重新连接并开始使用消息
此时,客户机可以合理地假设消息将再次传递。
事实并非如此:重新启动已导致代理丢失消息。为了保证持久性,客户机应该使用 confirms。如果发布者的 channel 处于确认模式,则发布者不会收到丢失消息的 ack(因为消息尚未写入磁盘)。
限制
最大的 delivery tag
Delivery tag 是一个 64 位 长的值,因此它的最大值是 9223372036854775807。
- 由于传递标记的作用域是每个通道的,因此发布者或使用者在实践中不太可能超过此值。