RabbitMQ:Publisher Confirms实现
关于
【以下内容来自官网:Publisher Confirms】 Publisher confirms 是一个 RabbitMQ 扩展,用于实现可靠的发布。
当在 channel 上启用 publisher confirms 时,代理将异步确认客户端发布的消息,这意味着它们已在服务器端得到处理。【???】
在 channel 上启用 publisher confirms
Publisher confirms 是 AMQP 0.9.1 协议的 RabbitMQ 扩展,因此默认情况下不启用它们。
使用 confirmSelect 方法在 channel 级别启用 Publisher confirms:
Channel channel = connection.createChannel();
channel.confirmSelect();
- 必须在您希望使用的每个通道上调用此方法。
- 确认只需被启用一次,而不是发布每条消息时都启用。
策略1:单独发布消息
从使用 confirms 发布消息的最简单方法开始,即发布消息并同步等待其确认:
while (thereAreMessagesToPublish()) {
byte[] body = ...;
BasicProperties properties = ...;
channel.basicPublish(exchange, queue, properties, body);
// uses a 5 second timeout
channel.waitForConfirmsOrDie(5_000);
}
如上,使用 Channel#waitForConfirmsOrDie(long)
方法等待消息的确认:
- 消息一经确认,该方法即返回。
- 如果消息在超时时间内未被确认,或者消息未被确认(意味着代理由于某种原因无法处理它),则该方法将引发异常。
- 异常处理通常包括:记录错误消息,重试发送消息。
(不同的客户机库有不同的方法来同步处理 publisher confirms)
这种技术非常简单,但也有一个主要缺点:它会显著减慢发布速度,因为消息的确认会阻止所有后续消息的发布。
- 这种方法不会提供每秒超过几百条已发布消息的吞吐量。然而,对于某些应用程序来说,这已经足够好了。
Publisher Confirms 是异步的吗?
- 我们在开头提到,代理异步确认已发布的消息。但在第一个示例中,代码同步等待,直到消息被确认。客户端实际上异步接收确认,并相应地解除对 waitforconfirmsor 的调用的阻塞。把 waitforconfirmsodie 看作一个同步的助手,它依赖于后台的异步通知。
- 【即:异步确认,但同步阻塞发送消息】
策略2:批量发布消息
为了改进前面的示例,我们可以发布一批消息并等待整个批消息得到确认。以下示例使用 100 个批次:
int batchSize = 100;
int outstandingMessageCount = 0;
while (thereAreMessagesToPublish()) {
byte[] body = ...;
BasicProperties properties = ...;
channel.basicPublish(exchange, queue, properties, body);
outstandingMessageCount++;
if (outstandingMessageCount == batchSize) {
ch.waitForConfirmsOrDie(5_000);
outstandingMessageCount = 0;
}
}
if (outstandingMessageCount > 0) {
ch.waitForConfirmsOrDie(5_000);
}
等待一批消息被确认大大提高了吞吐量,而不是等待单个消息的确认(对于远程RabbitMQ节点,最多可提高20-30倍)。
一个缺点是:
- 我们不知道在失败的情况下到底出了什么问题,因此我们可能必须在内存中保留一整批内容来记录有意义的内容或重新发布消息。
- 而且这个解决方案仍然是同步的,因为它阻止了消息的发布。
策略3:异步处理发布确认
代理以异步方式确认已发布的消息,只需在客户端上注册一个回调就可以得到这些确认的通知:
Channel channel = connection.createChannel();
channel.confirmSelect();
channel.addConfirmListener((sequenceNumber, multiple) -> {
// code when message is confirmed
}, (sequenceNumber, multiple) -> {
// code when message is nack-ed
});
有两个回调:
- 一个用于 confirmed 消息,
- 另一个用于 nack-ed 消息(代理可以认为丢失的消息)。
每个回调有2个参数:
- sequenceNumber:序列号,用于标识 confirmed 或 nack-ed 消息的数字;
- 它与已发布的消息相关联;
- multiple:这是一个布尔值。如果为 false,则只一条消息被 confirmed/nack-ed;如果为 true,则sequenceNumber较低或相等的所有消息都被 confirmed/nack-ed。
在发布之前,可以通过 Channel#getNextPublishSeqNo()
获取序列号:
int sequenceNumber = channel.getNextPublishSeqNo()); ch.basicPublish(exchange, queue, properties, body);
将消息与 sequenceNumber 关联的简单方法在于使用映射。
- 假设我们想要发布字符串,因为它们很容易转换成字节数组进行发布。下面是一个代码示例,它使用映射将发布序列号与消息的字符串体关联起来:
ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>(); // ... code for confirm callbacks will come later String body = "..."; outstandingConfirms.put(channel.getNextPublishSeqNo(), body); channel.basicPublish(exchange, queue, properties, body.getBytes());
发布代码现在使用映射跟踪出站消息。我们需要在确认信息到达时清理此 Map,并在消息未确认时记录警告:
ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
ConfirmCallback cleanOutstandingConfirms = (sequenceNumber, multiple) -> {
if (multiple) {
ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(
sequenceNumber, true
);
confirmed.clear();
} else {
outstandingConfirms.remove(sequenceNumber);
}
};
channel.addConfirmListener(cleanOutstandingConfirms, (sequenceNumber, multiple) -> {
String body = outstandingConfirms.get(sequenceNumber);
System.err.format(
"Message with body %s has been nack-ed. Sequence number: %d, multiple: %b%n",
body, sequenceNumber, multiple
);
cleanOutstandingConfirms.handle(sequenceNumber, multiple);
});
// ... publishing code
说明:上一个示例包含一个回调,在确认到达时清除映射(注意,这个回调处理单个和多个确认)。此回调在确认到达时使用(作为 Channel#addconfirmlister 的第一个参数)。nack-ed 消息的回调将检索消息正文并发出警告。然后,它重新使用上一个回调来清除未完成确认的映射(无论消息是已确认的还是未确认的,都必须删除映射中相应的条目)。
如何跟踪未完成的确认?:
- 我们的样本使用 ConcurrentNavigableMap 跟踪未完成的确认。这种数据结构方便有几个原因。它允许轻松地将序列号与消息(无论消息数据是什么)关联起来,并轻松地将条目清理到给定的序列 id(以处理多个 confirmed/nack-ed)。最后,它支持并发访问,因为 confirm 回调是在客户机库拥有的线程中调用的,它应该与发布线程保持不同。
- 与复杂的映射实现相比,还有其他方法可以跟踪未完成的映射,例如使用简单的并发哈希映射和变量来跟踪发布序列的下限,但它们通常更复杂,不属于教程。
(异步处理发布确认)总结
总之,异步处理 publisher confirms 通常需要以下步骤:
- 提供将发布序列号与消息关联的方法;
- 在通道上注册一个确认监听器,以便在发布者 ack/nack 到达时得到通知,以执行适当的操作,如记录或重新发布 nack-ed 消息;
- 序列号到消息关联机制在此步骤中可能还需要一些清理;
- 在发布消息之前跟踪发布序列号(sequenceNumber);
重新发布 nack-ed 消息?:
- 从相应的回调重新发布 nack-ed 消息是很有诱惑力的,但这应该避免,因为确认回调是在通道不应该执行操作的 I/O 线程中调度的。
- 更好的解决方案是将消息放入由发布线程轮询的内存队列中。类似 ConcurrentLinkedQueue 的类是在确认回调和发布线程之间传输消息的一个很好的候选者。
总结
在某些应用程序中,确保向代理发布消息是必不可少的。Publisher 确认有一个 RabbitMQ 特性可以帮助满足这个需求。Publisher 文件本质上是异步的,但也可以同步处理它们。没有明确的方法来实现,这通常归结为应用程序和整个系统中的约束。
典型技术包括:
- 单独发布消息,同步等待确认:简单,但吞吐量非常有限。
- 批量发布消息,同步等待一批消息的确认:简单、合理的吞吐量,但很难解释什么时候出错。
- 异步处理:最佳的性能和资源利用率,在发生错误时有良好的控制能力,但能够正确实现。
代码示例
PublisherConfirms.java:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmCallback;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.function.BooleanSupplier;
public class PublisherConfirms {
static final int MESSAGE_COUNT = 50_000;
static Connection createConnection() throws Exception {
ConnectionFactory cf = new ConnectionFactory();
cf.setHost("localhost");
cf.setUsername("guest");
cf.setPassword("guest");
return cf.newConnection();
}
public static void main(String[] args) throws Exception {
publishMessagesIndividually();
publishMessagesInBatch();
handlePublishConfirmsAsynchronously();
}
static void publishMessagesIndividually() throws Exception {
try (Connection connection = createConnection()) {
Channel ch = connection.createChannel();
String queue = UUID.randomUUID().toString();
ch.queueDeclare(queue, false, false, true, null);
ch.confirmSelect();
long start = System.nanoTime();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String body = String.valueOf(i);
ch.basicPublish("", queue, null, body.getBytes());
ch.waitForConfirmsOrDie(5_000);
}
long end = System.nanoTime();
System.out.format("Published %,d messages individually in %,d ms%n", MESSAGE_COUNT, Duration.ofNanos(end - start).toMillis());
}
}
static void publishMessagesInBatch() throws Exception {
try (Connection connection = createConnection()) {
Channel ch = connection.createChannel();
String queue = UUID.randomUUID().toString();
ch.queueDeclare(queue, false, false, true, null);
ch.confirmSelect();
int batchSize = 100;
int outstandingMessageCount = 0;
long start = System.nanoTime();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String body = String.valueOf(i);
ch.basicPublish("", queue, null, body.getBytes());
outstandingMessageCount++;
if (outstandingMessageCount == batchSize) {
ch.waitForConfirmsOrDie(5_000);
outstandingMessageCount = 0;
}
}
if (outstandingMessageCount > 0) {
ch.waitForConfirmsOrDie(5_000);
}
long end = System.nanoTime();
System.out.format("Published %,d messages in batch in %,d ms%n", MESSAGE_COUNT, Duration.ofNanos(end - start).toMillis());
}
}
static void handlePublishConfirmsAsynchronously() throws Exception {
try (Connection connection = createConnection()) {
Channel ch = connection.createChannel();
String queue = UUID.randomUUID().toString();
ch.queueDeclare(queue, false, false, true, null);
ch.confirmSelect();
ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
ConfirmCallback cleanOutstandingConfirms = (sequenceNumber, multiple) -> {
if (multiple) {
ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(
sequenceNumber, true
);
confirmed.clear();
} else {
outstandingConfirms.remove(sequenceNumber);
}
};
ch.addConfirmListener(cleanOutstandingConfirms, (sequenceNumber, multiple) -> {
String body = outstandingConfirms.get(sequenceNumber);
System.err.format(
"Message with body %s has been nack-ed. Sequence number: %d, multiple: %b%n",
body, sequenceNumber, multiple
);
cleanOutstandingConfirms.handle(sequenceNumber, multiple);
});
long start = System.nanoTime();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String body = String.valueOf(i);
outstandingConfirms.put(ch.getNextPublishSeqNo(), body);
ch.basicPublish("", queue, null, body.getBytes());
}
if (!waitUntil(Duration.ofSeconds(60), () -> outstandingConfirms.isEmpty())) {
throw new IllegalStateException("All messages could not be confirmed in 60 seconds");
}
long end = System.nanoTime();
System.out.format("Published %,d messages and handled confirms asynchronously in %,d ms%n", MESSAGE_COUNT, Duration.ofNanos(end - start).toMillis());
}
}
static boolean waitUntil(Duration timeout, BooleanSupplier condition) throws InterruptedException {
int waited = 0;
while (!condition.getAsBoolean() && waited < timeout.toMillis()) {
Thread.sleep(100L);
waited = +100;
}
return condition.getAsBoolean();
}
}