Kafka:API
关于
Apache Kafka引入一个新的java客户端(在org.apache.kafka.clients 包中),替代老的Scala客户端,但是为了兼容,将会共存一段时间。
- 为了减少依赖,这些客户端都有一个独立的jar,而旧的Scala客户端继续与服务端保留在同个包下。
Kafka 有 5 个核心 API:
- Producer API:允许应用程序发送数据流到kafka集群中的topic。
- Consumer API:允许应用程序从kafka集群的topic中读取数据流。
- Streams API:允许从输入topic转换数据流到输出topic。【???】
- Connect API:通过实现连接器(connector),不断地从一些源系统或应用程序中拉取数据到kafka,或从kafka提交数据到宿系统(sink system)或应用程序。【???】
- Admin API:用于管理和检查topic,broker和其他Kafka对象。
kafka 公开了其所有的功能协议,与语言无关。
只有java客户端作为kafka项目的一部分进行维护,其他的作为开源的项目提供,这里提供了非java客户端的列表。 https://cwiki.apache.org/confluence/display/KAFKA/Clients
Producer API
我们鼓励所有新开发的程序使用新的Java生产者,新的java生产者客户端比以前的Scala的客户端更快、功能更全面。
引入Maven(可以更改新的版本号):
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
kafka客户端 发布 record(消息) 到 kafka集群
新的生产者是线程安全的,在线程之间共享单个生产者实例,通常单例比多个实例要快。
示例,使用producer发送一个有序的key/value(键值对):
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for(int i = 0; i < 100; i++)
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
producer.close();
其中:
- 生产者的缓冲空间池保留尚未发送到服务器的消息,后台I/O线程负责将这些消息转换成请求发送到集群。如果使用后不关闭生产者,则会丢失这些消息。
- send()方法是异步的,添加消息到缓冲区等待发送,并立即返回。
- 生产者将单个的消息批量在一起发送来提高效率。
- ack是判别请求是否为完整的条件(就是是判断是不是成功发送了)。
- 指定了“all”将会阻塞消息,这种设置性能最低,但是最可靠。
- retries,如果请求失败,生产者会自动重试,我们指定是 0 次。
- 如果启用重试,则会有重复消息的可能性。
- batch.size指定缓存的大小。【producer(生产者)缓存每个分区未发送的消息】
- 值较大的话将会产生更大的批,并需要更多的内存(因为每个“活跃”的分区都有1个缓冲区)。
- linger.ms指定发送请求逗留时间。
- 默认缓冲可立即发送,即便缓冲空间还没有满,但是,如果你想减少请求的数量,可以设置 linger.ms 大于 0。这将指示生产者发送请求之前等待一段时间,希望更多的消息填补到未满的批中。
- 这类似于TCP的算法,例如上面的代码段,可能 100 条消息在一个请求发送,因为我们设置了linger(逗留)时间为 1 毫秒,然后,如果我们没有填满缓冲区,这个设置将增加 1 毫秒的延迟请求以等待更多的消息。
- 需要注意的是,在高负载下,相近的时间一般也会组成批,即使是 linger.ms=0。在不处于高负载的情况下,如果设置比 0 大,以少量的延迟代价换取更少的、更有效的请求。
- buffer.memory 控制生产者可用的缓存总量,如果消息发送速度比其传输到服务器的快,将会耗尽这个缓存空间。
- 当缓存空间耗尽,其他发送调用将被阻塞,阻塞时间的阈值通过 max.block.ms 设定,之后它将抛出一个 TimeoutException。
- key.serializer 和 value.serializer 示例,将用户提供的 key 和 value 对象 ProducerRecord 转换成字节,你可以使用附带的 ByteArraySerializaer 或 StringSerializer 处理简单的 string 或 byte 类型。
幂等和事务
从Kafka 0.11开始,Kafka Producer 又支持两种模式:
- 幂等生产者:加强了 Kafka 的交付语义,从至少一次交付到精确一次交付。
- 幂等生产者特别是生产者的重试将不再引入重复。
- 事务生产者:事务性生产者允许应用程序原子地将消息发送到多个分区(和主题!)。
- 要启用幂等(idempotence),必须将“enable.idempotence”配置设置为true。
- 如果设置,则“retries”(重试)配置将默认为“Integer.MAX_VALUE”,“acks”配置将默认为“all”。API没有变化,所以无需修改现有应用程序即可利用此功能。
- 此外,如果 send(ProducerRecord) 即使在无限次重试的情况下也会返回错误(例如消息在发送前在缓冲区中过期),那么建议关闭生产者,并检查最后产生的消息的内容,以确保它不重复。
- 最后,生产者只能保证单个会话内发送的消息的幂等性。
- 要使用“事务生产者”和 attendant API,必须设置“transactional.id”。
- 如果设置了transactional.id,幂等性会和幂等所依赖的生产者配置一起自动启用。
- 此外,应该对包含在事务中的 topic 进行耐久性配置。特别是,“replication.factor”应该至少是 3,而且这些 topic 的“min.insync.replicas”应该设置为 2。
- 最后,为了实现从端到端的事务性保证,消费者也必须配置为只读取已提交的消息。
- transactional.id 的目的是实现单个生产者实例的多个会话之间的事务恢复。它通常是由分区、有状态的应用程序中的分片标识符派生的。因此,它对于在分区应用程序中运行的每个生产者实例来说应该是唯一的。
- 所有新的事务性 API 都是阻塞的,并且会在失败时抛出异常。
下面的例子说明了新的 API 是如何使用的。它与上面的例子类似,只是所有 100 条消息都是一个事务的一部分:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "my-transactional-id");
Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
producer.initTransactions();
try {
producer.beginTransaction();
for (int i = 0; i < 100; i++)
producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
// We can't recover from these exceptions, so our only option is to close the producer and exit.
producer.close();
} catch (KafkaException e) {
// For all other exceptions, just abort the transaction and try again.
producer.abortTransaction();
}
producer.close();
如上所示:
- 每个生产者只能有一个未完成的事务。在“beginTransaction()”和“commitTransaction()”调用之间发送的所有消息都将是单个事务的一部分。当指定“transactional.id”时,生产者发送的所有消息都必须是事务的一部分。
- 事务生产者使用异常来传递错误状态。特别是,不需要为“producer.send()”指定回调,也不需要在返回的 Future 上调用 .get():如果任何 producer.send() 或事务性调用在事务过程中遇到不可恢复的错误,就会抛出KafkaException。
- 该客户端可以与0.10.0或更高版本的 broker 进行通信。旧的或较新的 broker 可能不支持某些客户端功能。例如,事务性API需要 0.11.0或更新版本的broker。当调用在运行的broker版本中不可用的API时,您将收到UnsupportedVersionException。
send()
public Future<RecordMetadata> send(ProducerRecord<K,V> record,Callback callback)
异步发送一条消息到 topic,并调用 callback(当发送已确认)。
- send是异步的,并且一旦消息被保存在“等待发送的消息缓存”中,此方法就立即返回。这样并行发送多条消息而不阻塞去等待每一条消息的响应。
- 发送的结果是一个 RecordMetadata,它指定了:消息发送的分区,分配的 offset 和消息的时间戳。
- 如果topic使用的是CreateTime,则使用用户提供的时间戳或发送的时间(如果用户没有指定指定消息的时间戳)如果topic使用的是 LogAppendTime,则追加消息时,时间戳是broker的本地时间。
Throws:
- InterruptException:如果线程在阻塞中断。
- SerializationException:如果key或value不是给定有效配置的serializers。
- TimeoutException:如果获取元数据或消息分配内存话费的时间超过max.block.ms。
- KafkaException:Kafka有关的错误(不属于公共API的异常)。
由于 send 调用是异步的,它将为分配消息的此消息的 RecordMetadata 返回一个 Future。如果 future 调用 get() 则将阻塞,直到相关请求完成并返回该消息的 metadata,或抛出发送异常。【???】
如果要模拟一个简单的阻塞调用,可以调用get()方法:
byte[] key = "key".getBytes();
byte[] value = "value".getBytes();
ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("my-topic", key, value)
producer.send(record).get();
完全无阻塞的话,可以利用回调参数提供的请求完成时将调用的回调通知:
ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value);
producer.send(myRecord,
new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if(e != null)
e.printStackTrace();
System.out.println("The offset of the record we just sent is: " + metadata.offset());
}
});
发送到同一个分区的消息回调保证按一定的顺序执行,也就是说,在下面的例子中 callback1 保证执行 callback2 之前:
producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key1, value1), callback1);
producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key2, value2), callback2);
- 注意:callback一般在生产者的I/O线程中执行,所以是相当的快的,否则将延迟其他的线程的消息发送。如果你需要执行阻塞或计算昂贵(消耗)的回调,建议在callback主体中使用自己的Executor来并行处理。
Consumer API
随着0.9.0版本,我们已经增加了一个新的Java消费者替换我们现有的基于zookeeper的高级和低级消费者。这个客户端还是测试版的质量。为了确保用户平滑升级,我们仍然维护旧的0.8版本的消费者客户端继续在0.9集群上工作,两个老的0.8 API的消费者( 高级消费者 和 低级消费者)。
这个新的消费API,清除了0.8版本的高版本和低版本消费者之间的区别,可以通过下面的maven,引入依赖到你的客户端:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
kafka客户端从kafka集群中获取消息,并透明地处理kafka集群中出现故障broker,透明地调节适应集群中变化的数据分区。也和broker交互,负载平衡消费者。
public class KafkaConsumer<K,V>
extends Object
implements Consumer<K,V>
- 消费者维护着与broker的TCP连接来获取消息,如果在使用后没有关闭消费者,则会泄露这些连接。
- 消费者不是线程安全的。
跨版本兼容性
该客户端可以与0.10.0或更新版本的broker集群进行通信。较早的版本可能不支持某些功能。
- 例如,0.10.0 broker不支持 offsetsForTimes,因为此功能是在版本0.10.1中添加的。
如果你调用broker版本不可用的API时,将报 UnsupportedVersionException 异常。
offset(偏移量)和消费者位置
kafka为分区中的每条消息保存一个偏移量(offset),这个偏移量是该分区中一条消息的唯一标示。也表示消费者在分区的位置。
- 例如,一个位置是 5 的消费者(说明已经消费了0到4的消息),下一个将接收消息的偏移量为 5 的消息。
实际上这有两个与消费者相关的 “位置” 概念:
- 消费者的位置给出了下一条消息的偏移量。它比消费者在该分区中看到的最大偏移量要大一个。它在每次消费者在调用“poll(Duration)”中接收消息时自动增长。
- 已提交的位置是已安全保存的最后偏移量,如果进程失败或重新启动时,消费者将恢复到这个偏移量。
- 消费者可以选择定期自动提交偏移量,也可以选择通过调用 commit API来手动的控制【如:commitSync(同步提交)和 commitAsync(异步提交)】。
这个主要区别是消费者来控制一条消息什么时候才被认为是已被消费的,控制权在消费者。
消费者组和主题订阅
Kafka的消费者组概念,通过 进程池 瓜分消息并处理消息。
- 这些进程可以在同一台机器运行,也可分布到多台机器上,以增加可扩展性和容错性,相同 group.id 的消费者将视为同一个消费者组。
从概念上讲,你可以将消费者分组看作是“由多个进程组成的单一逻辑订阅者”。作为一个多订阅系统,Kafka支持对于给定topic任何数量的消费者组,而不重复:
- 所有进程都将是单个消费者分组的一部分(类似传统消息传递系统中的队列的语义),因此消息传递就像队列一样,在组中平衡。
- 与传统的消息系统不同的是,虽然,你可以有多个这样的组。但每个进程都有自己的消费者组(类似于传统消息系统中 pub-sub 的语义),因此每个进程都会订阅到该主题的所有消息。
- 【???即,一个 client(线程):对应一个(获取一个topic)consumer group;对应多个(获取多个topic)consumer group】
组中的每个消费者都通过 subscribe API动态的订阅一个 topic 列表。kafka将已订阅topic的消息发送到每个消费者组中。并通过平衡分区在消费者分组中所有成员之间来达到平均。
- 因此(一个消费者组中)每个分区恰好地分配1个消费者。
- 所有如果一个topic有 4 个分区,并且一个消费者分组有只有 2 个消费者。那么每个消费者将消费 2 个分区。
重新平衡分组:即,消费者组的成员是动态维护的:
- 如果一个消费者故障。分配给它的分区将重新分配给同一个分组中其他的消费者。
- 同样的,如果一个新的消费者加入到分组,将从现有消费者中移一个给它。
- 当新分区添加到订阅的topic时,或者当创建与订阅的正则表达式匹配的新topic时,也将重新平衡。将通过定时刷新自动发现新的分区,并将其分配给分组的成员。
当分组重新分配自动发生时,可以通过 ConsumerRebalanceListener 通知消费者,这允许他们完成必要的应用程序级逻辑,例如状态清除,手动偏移提交等。
- 它也允许消费者通过使用 assign(Collection) 手动分配指定分区,如果使用手动指定分配分区,那么动态分区分配和协调消费者组将失效。
发现消费者故障
订阅一组 topic 后,当调用“poll(long)”时,消费者将自动加入到组中。只要持续的调用 poll,消费者将一直保持可用,并继续从分配的分区中接收消息。
- 此外,消费者向服务器定时发送心跳。 如果消费者崩溃或无法在 session.timeout.ms 配置的时间内发送心跳,则消费者将被视为死亡,并且其分区将被重新分配。
还有一种可能,消费可能遇到“活锁”的情况:它持续的发送心跳,但是没有处理。
- 为了预防消费者在这种情况下一直持有分区,我们使用 max.poll.interval.ms 活跃检测机制。
- 在此基础上,如果你调用的 poll 的频率大于最大间隔,则客户端将主动地离开组,以便其他消费者接管该分区。
- 发生这种情况时,你会看到 offset 提交失败(调用“commitSync()”引发的 CommitFailedException)。
这是一种安全机制,保障只有活动成员能够提交 offset。所以要留在组中,必须持续调用poll。
消费者提供两个配置设置来控制 poll 循环:
- max.poll.interval.ms:增大 poll 的间隔,可以为消费者提供更多的时间去处理返回的消息(调用 poll 返回的消息通常都是一批)。
- 缺点是此值越大将会延迟组重新平衡。
- max.poll.records:限制每次调用 poll 返回的消息数,这样可以更容易的预测每次 poll 间隔要处理的最大值。
- 通过调整此值,可以减少poll间隔,减少重新平衡分组的。
对于消息处理时间不可预测地的情况,这些选项是不够的。 处理这种情况的推荐方法是:将消息处理移到另一个线程中,让消费者继续调用 poll。 但是必须注意确保已提交的 offset 不超过实际位置。
- 另外,你必须禁用自动提交,并只有在线程完成处理后才为记录手动提交偏移量(取决于你)。
- 还要注意,你需要 pause 暂停分区,不会从 poll 接收到新消息,让线程处理完之前返回的消息(如果你的处理能力比拉取消息的慢,那创建新线程将导致你机器内存溢出)。
【???】
示例
自动提交偏移量(Automatic Offset Committing)
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
如上,在这个例子中,客户端订阅了主题 foo 和 bar,消费者组叫 test:
- 设置 enable.auto.commit,偏移量由 auto.commit.interval.ms 控制自动提交的频率。
- 集群是通过配置 bootstrap.servers 指定一个或多个 broker。不用指定全部的broker,它将自动发现集群中的其余的borker(最好指定多个,万一有服务器故障)。
- broker 通过心跳机器自动检测 test 组中失败的进程,消费者会自动 ping 集群,告诉进群它还活着。只要消费者能够做到这一点,它就被认为是活着的,并保留分配给它分区的权利,如果它停止心跳的时间超过session.timeout.ms,那么就会认为是故障的,它的分区将被分配到别的进程。
- 这个 deserializer 设置如何把 byte 转成 object 类型,例子中,通过指定 string 解析器,我们告诉获取到的消息的 key 和 value 只是简单个 string 类型。
手动控制偏移量(Manual Offset Control)
不需要定时的提交 offset,可以自己控制 offset:当消息认为已消费过了,这个时候再去提交它们的偏移量。
- 使用手动偏移控制的优点是,您可以直接控制记录何时被视为“已消耗”。
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "false");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
insertIntoDb(buffer);
consumer.commitSync();
buffer.clear();
}
}
如上:
- 我们将消费一批消息并将它们存储在内存中,当我们积累足够多的消息后,我们再将它们批量插入到数据库中。如果我们设置 offset 自动提交,消费将被认为是已消费的。
- 这样会出现问题,我们的进程可能在批处理记录之后,但在它们被插入到数据库之前失败了。
- 为了避免这种情况,我们将在相应的记录插入数据库之后再手动提交偏移量。这样我们可以准确控制消息是成功消费的。
- 提出一个相反的可能性:在插入数据库之后,但是在提交之前,这个过程可能会失败(即使这可能只是几毫秒,这是一种可能性)。在这种情况下,进程将获取到已提交的偏移量,并会重复插入的最后一批数据。
这种方式就是所谓的“至少一次”(At least once)保证,在故障情况下,可以重复。
- 如果您无法执行这些操作,可能会使已提交的偏移超过消耗的位置,从而导致缺少记录。
注意:
- 使用自动提交也可以“至少一次”【???】。
- 但是要求你必须下次调用 poll(Duration) 之前或关闭消费者之前,处理完所有返回的数据。如果操作失败,这将会导致已提交的 offset 超过消费的位置,从而导致丢失消息。
上面的例子使用 commitSync 表示所有收到的消息为“已提交”,在某些情况下,你可以希望更精细的控制,通过指定一个明确消息的偏移量为“已提交”。
如下,我们处理完每个分区中的消息后,提交偏移量:
try {
while(running) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
System.out.println(record.offset() + ": " + record.value());
}
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
}
} finally {
consumer.close();
}
注意:
- 已提交的 offset 应始终是你的程序将读取的下一条消息的offset。因此,调用 commitSync(offsets) 时,你应该加 1 个到最后处理的消息的 offset。
订阅指定的分区(Manual Partition Assignment)
在前面的例子中,我们订阅我们感兴趣的topic,让kafka提供给我们平分后的topic分区。
但是,在有些情况下,你可能需要自己来控制分配指定分区,例如:
- 如果这个消费者进程与该分区保存了某种本地状态(如本地磁盘的键值存储),则它应该只能获取这个分区的消息。
- 如果消费者进程本身具有高可用性,并且如果它失败,会自动重新启动(可能使用集群管理框架,如 YARN,Mesos,或者 AWS 设施,或作为一个流处理框架的一部分)。
- 在这种情况下,不需要Kafka检测故障,重新分配分区,因为消费者进程将在另一台机器上重新启动。
要使用此模式,只需调用 assign(Collection) 消费指定的分区即可:
String topic = "foo";
TopicPartition partition0 = new TopicPartition(topic, 0);
TopicPartition partition1 = new TopicPartition(topic, 1);
consumer.assign(Arrays.asList(partition0, partition1));
一旦手动分配分区,你可以在循环中调用 poll(跟前面的例子一样)。消费者分组仍需要提交offset,只是现在分区的设置只能通过调用 assign 修改,因为手动分配不会进行分组协调,因此消费者故障不会引发分区重新平衡。
- 每一个消费者是独立工作的(即使和其他的消费者共享GroupId)。
- 为了避免offset提交冲突,通常你需要确认每一个 consumer 实例的 gorupId 都是唯一的。【???】
注意:
- 手动分配分区(即,assgin)和动态分区分配的订阅topic模式(即,subcribe)不能混合使用。
在 Kafka 之外存储偏移量【???】
消费者可以不使用 kafka 内置的 offset 仓库,而选择自己来存储 offset。
要注意的是:
- 将消费的offset和结果存储在同一个的系统中,用原子的方式存储结果和offset,但这不能保证原子,要想消费是完全原子的,并提供的“正好一次”(Exactly once)的消费保证比kafka默认的“至少一次”(At least once)的语义要更高。你需要使用kafka的offset提交功能。【???】
这有结合的例子:
- 如果消费的结果存储在关系数据库中,存储在数据库的offset,让提交结果和offset在单个事务中。这样,事物成功,则offset存储和更新。如果offset没有存储,那么偏移量也不会被更新。
- 如果offset和消费结果存储在本地仓库。例如,可以通过订阅一个指定的分区并将offset和索引数据一起存储来构建一个搜索索引。如果这是以原子的方式做的,常见的可能是,即使崩溃引起未同步的数据丢失。索引程序从它确保没有更新丢失的地方恢复,而仅仅丢失最近更新的消息。
每个消息都有自己的offset,所以要管理自己的偏移,你只需要做到以下几点:
- 配置 enable.auto.commit=false
- 使用提供的 ConsumerRecord 来保存你的位置。
- 在重启时用 seek(TopicPartition, long) 恢复消费者的位置。
自己存储 offset 时,需要注意分区分配:
- 如果分区分配也是手动完成的(像上文搜索索引的情况),这种类型的使用是最简单的。
- 如果分区分配是自动完成的,需要特别小心处理分区分配变更的情况。
- 可以通过调用“subscribe(Collection,ConsumerRebalanceListener)”和“subscribe(Pattern,ConsumerRebalanceListener)”中提供的 ConsumerRebalanceListener 实例来完成的。
例如:
- 当分区向消费者获取时【?】,消费者将通过实现 “ConsumerRebalanceListener.onPartitionsRevoked(Collection)” 来给这些分区提交它们offset。
- 当分区分配给消费者时,消费者通过“ConsumerRebalanceListener.onPartitionsAssigned(Collection)”为新的分区正确地将消费者初始化到该位置。
ConsumerRebalanceListener 的另一个常见用法是清除应用已移动到其他位置的分区的缓存。【???】
指定消费位置
大多数情况下,消费者只是简单的从头到尾的消费消息,周期性的提交位置(自动或手动)。kafka也支持消费者去手动的控制消费的位置,可以消费之前的消息也可以跳过最近的消息。 有几种情况,手动控制消费者的位置可能是有用的: 1、 对于时间敏感的消费者处理程序,对足够落后的消费者,直接跳过,从最近的消费开始消费。 2、 本地状态存储系统。(上一节说的)在这样的系统中,消费者将要在启动时初始化它的位置(无论本地存储是否包含)。同样,如果本地状态已被破坏(假设因为磁盘丢失),则可以通过重新消费所有数据并重新创建状态(假设kafka保留了足够的历史)在新的机器上重新创建。
kafka 使用“seek(TopicPartition, long)”指定新的消费位置。
- 用于查找服务器保留的最早和最新的offset的特殊的方法也可用“seekToBeginning(Collection)”和“seekToEnd(Collection)”。
消费者流量控制【???】
如果消费者分配了多个分区,并同时消费所有的分区,这些分区具有相同的优先级。在一些情况下,消费者需要首先消费一些指定的分区,当指定的分区有少量或者已经没有可消费的数据时,则开始消费其他分区。 例如流处理,当处理器从 2 个topic获取消息并把这两个topic的消息合并,当其中一个topic长时间落后另一个,则暂停消费,以便落后的赶上来。
kafka 支持动态控制消费流量,分别在 future 的 poll(long) 中使用 pause(Collection) 和 resume(Collection) 来暂停消费指定分配的分区,重新开始消费指定暂停的分区。
读取事务性消息【???】
Kafka 0.11.0 中引入了事务,应用程序可以原子地写入多个主题和分区。为了使之工作,从这些分区读取的消费者应该被配置为只读取已提交的数据,这可以通过在消费者的配置中设置 isolation.level=read_committed 来实现。
在 read_committed 模式下,消费者将只读取那些已经成功提交的事务性消息(像读取非事务性消息一样)。
- 在 read_committed 模式下,没有客户端缓冲。
- 相反,read_committed 消费者的分区的结束偏移量是分区中属于一个事务的第一个消息的偏移量。这个偏移被称为“Last Stable Offset”(LSO,最后稳定偏移)。
一个 read_committed 消费者将只读到 LSO,并过滤掉任何已经中止的事务性消息。LSO 也会影响 read_committed 消费者的“seekToEnd(Collection)”和“endOffsets(Collection)”的行为。
- 最后,对于 read_committed 消费者来说,取数 lag(滞后指标)也被调整为相对 LSO。
带有事务性消息的分区将包括提交或中止标记,这些标记表示事务的结果。那里的标记不会返回给应用程序,但在 log 中却有一个偏移量。
- 因此,应用程序从带有事务消息的主题中读取时,会在消耗的偏移量中看到空白。这些缺失的消息将是事务标记,它们在两个隔离级别中为消费者过滤掉。
- 此外,使用 read_committed 消费者的应用程序也可能会看到由于中止的事务而产生的空隙,因为这些消息不会被消费者返回,但确实是有效的偏移量。
多线程处理【???】
Kafka 消费者不是线程安全的。所有网络I/O都发生在进行调用应用程序的线程中。用户的责任是确保多线程访问正确同步的。
- 非同步访问将导致 ConcurrentModificationException。
此规则唯一的例外是“wakeup()”,它可以安全地从外部线程来中断活动操作。在这种情况下,将从操作的线程阻塞并抛出一个 WakeupException。这可用于从其他线程来关闭消费者。
以下代码段显示了典型模式:【???】
public class KafkaConsumerRunner implements Runnable {
private final AtomicBoolean closed = new AtomicBoolean(false);
private final KafkaConsumer consumer;
public KafkaConsumerRunner(KafkaConsumer consumer) {
this.consumer = consumer;
}
@Override
public void run() {
try {
consumer.subscribe(Arrays.asList("topic"));
while (!closed.get()) {
ConsumerRecords records = consumer.poll(Duration.ofMillis(10000));
// Handle new records
}
} catch (WakeupException e) {
// Ignore exception if closing
if (!closed.get()) throw e;
} finally {
consumer.close();
}
}
// Shutdown hook which can be called from a separate thread
public void shutdown() {
closed.set(true);
consumer.wakeup();
}
}
在单独的线程中,可以通过设置关闭标志和唤醒消费者来关闭消费者。
closed.set(true);
consumer.wakeup();
我们没有多线程模型的例子,但留下几个操作可用来实现多线程处理消息。
- 每个线程一个消费者:每个线程有自己的消费者实例。
- 优点:
- 这是最容易实现的
- 因为它不需要在线程之间协调,所以通常它是最快的。
- 它按顺序处理每个分区(每个线程只处理它接受的消息)。
- 缺点:
- 更多的消费者意味着更多的TCP连接到集群(每个线程一个)。一般kafka处理连接非常的快,所以这是一个小成本。
- 更多的消费者意味着更多的请求被发送到服务器,但稍微较少的数据批次可能导致I/O吞吐量的一些下降。
- 所有进程中的线程总数受到分区总数的限制。
- 解耦消费和处理:一个或多个消费者线程,它来消费所有数据,其消费所有数据并将 ConsumerRecords 实例切换到由实际处理记录处理的处理器线程池来消费的阻塞队列。
- 优点:
- 可扩展消费者和处理进程的数量。这样单个消费者的数据可分给多个处理器线程来执行,避免对分区的任何限制。
- 缺点:
- 跨多个处理器的顺序保证需要特别注意,因为线程是独立的执行,后来的消息可能比遭到的消息先处理,这仅仅是因为线程执行的运气。如果对排序没有问题,这就不是个问题。
- 手动提交变得更困难,因为它需要协调所有的线程以确保处理对该分区的处理完成。
- 这种方法有多种玩法,例如,每个处理线程可以有自己的队列,消费者线程可以使用 TopicPartitionhash 到这些队列中,以确保按顺序消费,并且提交也将简化。
Streams API
在0.10.0增加了一个新的客户端库,Kafka Stream。
- 可以使用maven引入到你的项目中:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>2.8.0</version> </dependency>
- 当使用Scala时,你可以选择包含kafka-streams-scala库。
- 要使用基于Scala 2.13的Kafka Streams DSL,可以使用以下maven依赖:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams-scala_2.13</artifactId> <version>2.8.0</version> </dependency>
Kafka Streams 从一个或多个输入topic进行连续的计算并输出到0或多个外部topic中。
- 可以通过 TopologyBuilder 类定义一个计算逻辑处理器 DAG 拓扑。或者也可以通过提供的高级别 KStream DSL 来定义转换的 KStreamBuilder。(PS:计算逻辑其实就是自己的代码逻辑)【???】
KafkaStreams 类管理 Kafka Streams 实例的生命周期。
- 一个stream实例可以在配置文件中为处理器指定一个或多个Thread。
KafkaStreams 实例可以作为单个 streams 处理客户端(也可能是分布式的),与其他的相同应用 ID 的实例进行协调(无论是否在同一个进程中,在同一台机器的其他进程中,或远程机器上)。这些实例将根据输入topic分区的基础上来划分工作,以便所有的分区都被消费掉。如果实例添加或失败,所有实例将重新平衡它们之间的分区分配,以保证负载平衡。【???】
在内部,KafkaStreams 实例包含一个正常的 KafkaProducer 和 KafkaConsumer 实例,用于读取和写入。
一个简单的例子:
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
StreamsConfig config = new StreamsConfig(props);
KStreamBuilder builder = new KStreamBuilder();
builder.stream("my-input-topic").mapValues(value -> value.length().toString()).to("my-output-topic");
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
Connect API
Connect API 实现一个连接器(connector),不断地从一些数据源系统拉取数据到kafka,或从kafka推送到宿系统(sink system)。
大多数Connect使用者不需要直接操作这个API,可以使用之前构建的连接器,不需要编写任何代码。
Admin API
Admin API 用于管理和检查topic、broker、acls和其他Kafka对象。
要使用Admin API,请添加以下Maven依赖关系。
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
支持的方法
- alterClientQuotas
- alterConfigs
- alterConsumerGroupOffsets
- alterPartitionReassignments
- alterReplicaLogDirs
- alterUserScramCredentials
- close
- close
- createAcls
- createDelegationToken
- createPartitions
- createTopics
- deleteAcls
- deleteConsumerGroupOffsets
- deleteConsumerGroups
- deleteRecords
- deleteTopics
- describeAcls
- describeClientQuotas
- describeCluster
- describeConfigs
- describeConsumerGroups
- describeDelegationToken
- describeFeatures
- describeLogDirs
- describeReplicaLogDirs
- describeTopics
- describeUserScramCredentials
- describeUserScramCredentials
- electLeaders
- electPreferredLeaders
- electPreferredLeaders
- expireDelegationToken
- incrementalAlterConfigs
- listConsumerGroupOffsets
- listConsumerGroups
- listOffsets
- listPartitionReassignments
- listPartitionReassignments
- listPartitionReassignments
- listPartitionReassignments
- listTopics
- renewDelegationToken
示例
创建Topic
// bootstrapServers 如 localhost:9092
private void createTopics(String bootstrapServers) {
Properties properties = new Properties();
properties.put("bootstrap.servers", bootstrapServers);
properties.put("connections.max.idle.ms", 10000);
properties.put("request.timeout.ms", 5000);
try (AdminClient client = AdminClient.create(properties)) {
CreateTopicsResult result = client.createTopics(Arrays.asList(
new NewTopic("topic1", 1, (short) 1),
new NewTopic("topic2", 1, (short) 1),
new NewTopic("topic3", 1, (short) 1)
));
try {
result.all().get();
} catch (InterruptedException | ExecutionException e) {
throw new IllegalStateException(e);
}
}
}
topic列表
private void listTopics(String bootstrapServers) {
Properties properties = new Properties();
properties.put("bootstrap.servers", bootstrapServers);
properties.put("connections.max.idle.ms", 10000);
properties.put("request.timeout.ms", 5000);
try (AdminClient client = AdminClient.create(properties)) {
ListTopicsResult result = client.listTopics();
try {
result.listings().get().forEach(topic -> {
System.out.println(topic);
});
} catch (InterruptedException | ExecutionException e) {
throw new IllegalStateException(e);
}
}
}
输出:
(name=topic1, internal=false)
(name=topic2, internal=false)
(name=topic3, internal=false)
...
增加分区
Properties properties = new Properties();
properties.put("bootstrap.servers", bootstrapServers);
properties.put("connections.max.idle.ms", 10000);
properties.put("request.timeout.ms", 5000);
try (AdminClient client = AdminClient.create(properties)) {
Map newPartitions = new HashMap<>();
// 增加到2个
newPartitions.put("topic1", NewPartitions.increaseTo(2));
CreatePartitionsResult rs = client.createPartitions(newPartitions);
try {
rs.all().get();
} catch (InterruptedException | ExecutionException e) {
throw new IllegalStateException(e);
}
}