“Kafka:API”的版本间差异

来自Wikioe
跳到导航 跳到搜索
无编辑摘要
第31行: 第31行:
</syntaxhighlight>
</syntaxhighlight>


=== kafka客户端发布record(消息)到kafka集群 ===
=== kafka客户端 发布 record(消息) 到 kafka集群 ===
新的生产者是'''线程安全'''的,在线程之间'''共享单个生产者实例''',通常单例比多个实例要快。
新的生产者是'''线程安全'''的,在线程之间'''共享单个生产者实例''',通常单例比多个实例要快。


第55行: 第55行:
其中:
其中:
# 生产者的'''缓冲空间池'''保留尚未发送到服务器的消息,后台I/O线程负责将这些消息转换成请求发送到集群。'''如果使用后不关闭生产者,则会丢失这些消息'''。
# 生产者的'''缓冲空间池'''保留尚未发送到服务器的消息,后台I/O线程负责将这些消息转换成请求发送到集群。'''如果使用后不关闭生产者,则会丢失这些消息'''。
# '''send()'''方法是'''异步'''的,添加消息到缓冲区等待发送,并立即返回。'''生产者将单个的消息批量在一起发送来提高效率'''。
# '''send()'''方法是'''异步'''的,添加消息到缓冲区等待发送,并立即返回。
# '''ack'''是判别请求是否为完整的条件(就是是判断是不是成功发送了)。我们指定了“'''all'''”将会阻塞消息,这种设置'''性能最低,但是最可靠'''。
#* '''生产者将单个的消息批量在一起发送来提高效率'''。
# '''retries''',如果请求失败,生产者会'''自动重试''',我们指定是0次,如果启用重试,则'''会有重复消息的可能性'''。
# '''ack'''是判别请求是否为完整的条件(就是是判断是不是成功发送了)。
# producer(生产者)缓存每个分区未发送的消息。'''缓存的大小'''是通过 '''batch.size''' 配置指定的。值较大的话将会产生更大的批。并需要更多的内存(因为每个“活跃”的分区都有1个缓冲区)。
#* 指定了“'''all'''”将会阻塞消息,这种设置'''性能最低,但是最可靠'''。
# 默认缓冲可立即发送,即便缓冲空间还没有满,但是,如果你想减少请求的数量,可以设置 '''linger.ms''' 大于 0。这将指示生产者发送请求之前等待一段时间,希望更多的消息填补到未满的批中。
# '''retries''',如果请求失败,生产者会'''自动重试''',我们指定是 0 次。
#: 这类似于TCP的算法,例如上面的代码段,可能100条消息在一个请求发送,因为我们设置了linger(逗留)时间为 1 毫秒,然后,如果我们没有填满缓冲区,这个设置将增加 1 毫秒的延迟请求以等待更多的消息。
#* 如果启用重试,则'''会有重复消息的可能性'''。
#* 需要注意的是,在高负载下,相近的时间一般也会组成批,即使是 linger.ms=0。在不处于高负载的情况下,如果设置比0大,以少量的延迟代价换取更少的、更有效的请求。
# '''batch.size'''指定'''缓存的大小'''。【producer(生产者)缓存每个分区未发送的消息】
#* 值较大的话将会产生更大的批,并需要更多的内存(因为每个“活跃”的分区都有1个缓冲区)。
# '''linger.ms'''指定发送请求逗留时间。
#: 默认缓冲可立即发送,即便缓冲空间还没有满,但是,如果你想减少请求的数量,可以设置 linger.ms 大于 0。这将指示生产者发送请求之前等待一段时间,希望更多的消息填补到未满的批中。
#: 这类似于TCP的算法,例如上面的代码段,可能 100 条消息在一个请求发送,因为我们设置了linger(逗留)时间为 1 毫秒,然后,如果我们没有填满缓冲区,这个设置将增加 1 毫秒的延迟请求以等待更多的消息。
#* 需要注意的是,在高负载下,相近的时间一般也会组成批,即使是 linger.ms=0。在不处于高负载的情况下,如果设置比 0 大,以少量的延迟代价换取更少的、更有效的请求。
# '''buffer.memory''' 控制生产者可用的'''缓存总量''','''如果消息发送速度比其传输到服务器的快,将会耗尽这个缓存空间'''。
# '''buffer.memory''' 控制生产者可用的'''缓存总量''','''如果消息发送速度比其传输到服务器的快,将会耗尽这个缓存空间'''。
#: 当缓存空间耗尽,其他发送调用将被阻塞,阻塞时间的阈值通过 '''max.block.ms''' 设定,之后它将抛出一个 TimeoutException。
#: 当缓存空间耗尽,其他发送调用将被阻塞,阻塞时间的阈值通过 '''max.block.ms''' 设定,之后它将抛出一个 TimeoutException。
# '''key.serializer''' 和 '''value.serializer''' 示例,将用户提供的 key 和 value 对象 ProducerRecord 转换成字节,你可以使用附带的 ByteArraySerializaer 或 StringSerializer 处理简单的 string 或 byte 类型。
# '''key.serializer''' 和 '''value.serializer''' 示例,将用户提供的 key 和 value 对象 ProducerRecord 转换成字节,你可以使用附带的 ByteArraySerializaer 或 StringSerializer 处理简单的 string 或 byte 类型。


=== 幂等和事务 ===
从Kafka 0.11开始,Kafka Producer 又支持两种模式:
# '''幂等生产者''':加强了 Kafka 的交付语义,从至少一次交付到精确一次交付。
#* 幂等生产者特别是生产者的重试将不再引入重复。
# '''事务生产者''':事务性生产者允许应用程序原子地将消息发送到多个分区(和主题!)。




* 要启用幂等(idempotence),必须将“'''enable.idempotence'''”配置设置为'''true'''。
** 如果设置,则“retries”(重试)配置将默认为“Integer.MAX_VALUE”,“acks”配置将默认为“all”。API没有变化,所以无需修改现有应用程序即可利用此功能。
** 此外,如果 send(ProducerRecord) 即使在无限次重试的情况下也会返回错误(例如消息在发送前在缓冲区中过期),那么建议关闭生产者,并检查最后产生的消息的内容,以确保它不重复。
** 最后,生产者只能保证单个会话内发送的消息的幂等性。


* 要使用“事务生产者”和 attendant API,必须设置“'''transactional.id'''”。
** 如果设置了transactional.id,幂等性会和幂等所依赖的生产者配置一起自动启用。
** 此外,应该对包含在事务中的 topic 进行耐久性配置。特别是,“replication.factor”应该至少是 3,而且这些 topic 的“min.insync.replicas”应该设置为 2。
** 最后,为了实现从端到端的事务性保证,消费者也必须配置为只读取已提交的消息。


* transactional.id 的目的是实现单个生产者实例的多个会话之间的事务恢复。它通常是由分区、有状态的应用程序中的分片标识符派生的。因此,它对于在分区应用程序中运行的每个生产者实例来说应该是唯一的。
* 所有新的事务性 API 都是阻塞的,并且会在失败时抛出异常。


下面的例子说明了新的 API 是如何使用的。它与上面的例子类似,只是所有 100 条消息都是一个事务的一部分:
<syntaxhighlight lang="java" highlight="6,9,12,18">
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "my-transactional-id");
Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
producer.initTransactions();
try {
    producer.beginTransaction();
    for (int i = 0; i < 100; i++)
        producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
    producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
    // We can't recover from these exceptions, so our only option is to close the producer and exit.
    producer.close();
} catch (KafkaException e) {
    // For all other exceptions, just abort the transaction and try again.
    producer.abortTransaction();
}
producer.close();
</syntaxhighlight>
如上所示:
# 每个生产者只能有一个未完成的事务。在“beginTransaction()”和“commitTransaction()”调用之间发送的所有消息都将是单个事务的一部分。当指定“transactional.id”时,生产者发送的所有消息都必须是事务的一部分。
# 事务生产者使用异常来传递错误状态。特别是,不需要为“producer.send()”指定回调,也不需要在返回的 Future 上调用 .get():如果任何 producer.send() 或事务性调用在事务过程中遇到不可恢复的错误,就会抛出'''KafkaException'''。
# 该客户端可以与0.10.0或更高版本的 broker 进行通信。旧的或较新的 broker 可能不支持某些客户端功能。例如,事务性API需要 0.11.0或更新版本的broker。当调用在运行的broker版本中不可用的API时,您将收到'''UnsupportedVersionException'''。
=== send() ===
<syntaxhighlight lang="java" highlight="">
public Future<RecordMetadata> send(ProducerRecord<K,V> record,Callback callback)
</syntaxhighlight>
异步发送一条消息到 topic,并调用 '''callback'''(当发送已确认)。
# send是'''异步'''的,并且一旦消息被保存在“等待发送的消息缓存”中,此方法就立即返回。这样并行发送多条消息而不阻塞去等待每一条消息的响应。
# 发送的结果是一个 RecordMetadata,它指定了:消息发送的分区,分配的 offset 和消息的时间戳。
#* 如果topic使用的是CreateTime,则使用用户提供的时间戳或发送的时间(如果用户没有指定指定消息的时间戳)如果topic使用的是 LogAppendTime,则追加消息时,时间戳是broker的本地时间。
Throws:
* InterruptException:如果线程在阻塞中断。
* SerializationException:如果key或value不是给定有效配置的serializers。
* TimeoutException:如果获取元数据或消息分配内存话费的时间超过max.block.ms。
* KafkaException:Kafka有关的错误(不属于公共API的异常)。
由于 send 调用是异步的,它将为分配消息的此消息的 RecordMetadata 返回一个 '''Future'''。'''如果 future 调用 get() 则将阻塞,直到相关请求完成并返回该消息的 metadata,或抛出发送异常'''。【???】




如果要模拟一个简单的阻塞调用,可以调用get()方法:
<syntaxhighlight lang="java" highlight="">
byte[] key = "key".getBytes();
byte[] value = "value".getBytes();
ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("my-topic", key, value)
producer.send(record).get();
</syntaxhighlight>
完全无阻塞的话,可以利用回调参数提供的请求完成时将调用的回调通知:
<syntaxhighlight lang="java" highlight="">
ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value);
producer.send(myRecord,
              new Callback() {
                  public void onCompletion(RecordMetadata metadata, Exception e) {
                      if(e != null)
                          e.printStackTrace();
                      System.out.println("The offset of the record we just sent is: " + metadata.offset());
                  }
              });
</syntaxhighlight>
'''发送到同一个分区的消息回调保证按一定的顺序执行''',也就是说,在下面的例子中 callback1 保证执行 callback2 之前:
<syntaxhighlight lang="java" highlight="">
producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key1, value1), callback1);
producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key2, value2), callback2);
</syntaxhighlight>
* 注意:callback一般在生产者的I/O线程中执行,所以是相当的快的,否则将延迟其他的线程的消息发送。如果你需要执行阻塞或计算昂贵(消耗)的回调,建议在callback主体中使用自己的Executor来并行处理。


== Consumer API ==
== Consumer API ==

2021年5月19日 (三) 23:11的版本


关于

Apache Kafka引入一个新的java客户端(在org.apache.kafka.clients 包中),替代老的Scala客户端,但是为了兼容,将会共存一段时间。

  • 为了减少依赖,这些客户端都有一个独立的jar,而旧的Scala客户端继续与服务端保留在同个包下。


Kafka 有 5 个核心 API:

  1. Producer API:允许应用程序发送数据流到kafka集群中的topic。
  2. Consumer API:允许应用程序从kafka集群的topic中读取数据流。
  3. Streams API:允许从输入topic转换数据流到输出topic。【???】
  4. Connect API:通过实现连接器(connector),不断地从一些源系统或应用程序中拉取数据到kafka,或从kafka提交数据到宿系统(sink system)或应用程序。【???】
  5. Admin API:用于管理和检查topic,broker和其他Kafka对象

kafka 公开了其所有的功能协议,与语言无关。

只有java客户端作为kafka项目的一部分进行维护,其他的作为开源的项目提供,这里提供了非java客户端的列表。 https://cwiki.apache.org/confluence/display/KAFKA/Clients

Producer API

我们鼓励所有新开发的程序使用新的Java生产者,新的java生产者客户端比以前的Scala的客户端更快、功能更全面。


引入Maven(可以更改新的版本号):

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.8.0</version>
</dependency>

kafka客户端 发布 record(消息) 到 kafka集群

新的生产者是线程安全的,在线程之间共享单个生产者实例,通常单例比多个实例要快。


示例,使用producer发送一个有序的key/value(键值对):

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(props);
for(int i = 0; i < 100; i++)
    producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));

producer.close();

其中:

  1. 生产者的缓冲空间池保留尚未发送到服务器的消息,后台I/O线程负责将这些消息转换成请求发送到集群。如果使用后不关闭生产者,则会丢失这些消息
  2. send()方法是异步的,添加消息到缓冲区等待发送,并立即返回。
    • 生产者将单个的消息批量在一起发送来提高效率
  3. ack是判别请求是否为完整的条件(就是是判断是不是成功发送了)。
    • 指定了“all”将会阻塞消息,这种设置性能最低,但是最可靠
  4. retries,如果请求失败,生产者会自动重试,我们指定是 0 次。
    • 如果启用重试,则会有重复消息的可能性
  5. batch.size指定缓存的大小。【producer(生产者)缓存每个分区未发送的消息】
    • 值较大的话将会产生更大的批,并需要更多的内存(因为每个“活跃”的分区都有1个缓冲区)。
  6. linger.ms指定发送请求逗留时间。
    默认缓冲可立即发送,即便缓冲空间还没有满,但是,如果你想减少请求的数量,可以设置 linger.ms 大于 0。这将指示生产者发送请求之前等待一段时间,希望更多的消息填补到未满的批中。
    这类似于TCP的算法,例如上面的代码段,可能 100 条消息在一个请求发送,因为我们设置了linger(逗留)时间为 1 毫秒,然后,如果我们没有填满缓冲区,这个设置将增加 1 毫秒的延迟请求以等待更多的消息。
    • 需要注意的是,在高负载下,相近的时间一般也会组成批,即使是 linger.ms=0。在不处于高负载的情况下,如果设置比 0 大,以少量的延迟代价换取更少的、更有效的请求。
  7. buffer.memory 控制生产者可用的缓存总量如果消息发送速度比其传输到服务器的快,将会耗尽这个缓存空间
    当缓存空间耗尽,其他发送调用将被阻塞,阻塞时间的阈值通过 max.block.ms 设定,之后它将抛出一个 TimeoutException。
  8. key.serializervalue.serializer 示例,将用户提供的 key 和 value 对象 ProducerRecord 转换成字节,你可以使用附带的 ByteArraySerializaer 或 StringSerializer 处理简单的 string 或 byte 类型。

幂等和事务

从Kafka 0.11开始,Kafka Producer 又支持两种模式:

  1. 幂等生产者:加强了 Kafka 的交付语义,从至少一次交付到精确一次交付。
    • 幂等生产者特别是生产者的重试将不再引入重复。
  2. 事务生产者:事务性生产者允许应用程序原子地将消息发送到多个分区(和主题!)。


  • 要启用幂等(idempotence),必须将“enable.idempotence”配置设置为true
    • 如果设置,则“retries”(重试)配置将默认为“Integer.MAX_VALUE”,“acks”配置将默认为“all”。API没有变化,所以无需修改现有应用程序即可利用此功能。
    • 此外,如果 send(ProducerRecord) 即使在无限次重试的情况下也会返回错误(例如消息在发送前在缓冲区中过期),那么建议关闭生产者,并检查最后产生的消息的内容,以确保它不重复。
    • 最后,生产者只能保证单个会话内发送的消息的幂等性。
  • 要使用“事务生产者”和 attendant API,必须设置“transactional.id”。
    • 如果设置了transactional.id,幂等性会和幂等所依赖的生产者配置一起自动启用。
    • 此外,应该对包含在事务中的 topic 进行耐久性配置。特别是,“replication.factor”应该至少是 3,而且这些 topic 的“min.insync.replicas”应该设置为 2。
    • 最后,为了实现从端到端的事务性保证,消费者也必须配置为只读取已提交的消息。
  • transactional.id 的目的是实现单个生产者实例的多个会话之间的事务恢复。它通常是由分区、有状态的应用程序中的分片标识符派生的。因此,它对于在分区应用程序中运行的每个生产者实例来说应该是唯一的。
  • 所有新的事务性 API 都是阻塞的,并且会在失败时抛出异常。


下面的例子说明了新的 API 是如何使用的。它与上面的例子类似,只是所有 100 条消息都是一个事务的一部分:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "my-transactional-id");
Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());

producer.initTransactions();

try {
    producer.beginTransaction();
    for (int i = 0; i < 100; i++)
        producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
    producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
    // We can't recover from these exceptions, so our only option is to close the producer and exit.
    producer.close();
} catch (KafkaException e) {
    // For all other exceptions, just abort the transaction and try again.
    producer.abortTransaction();
}
producer.close();

如上所示:

  1. 每个生产者只能有一个未完成的事务。在“beginTransaction()”和“commitTransaction()”调用之间发送的所有消息都将是单个事务的一部分。当指定“transactional.id”时,生产者发送的所有消息都必须是事务的一部分。
  2. 事务生产者使用异常来传递错误状态。特别是,不需要为“producer.send()”指定回调,也不需要在返回的 Future 上调用 .get():如果任何 producer.send() 或事务性调用在事务过程中遇到不可恢复的错误,就会抛出KafkaException
  3. 该客户端可以与0.10.0或更高版本的 broker 进行通信。旧的或较新的 broker 可能不支持某些客户端功能。例如,事务性API需要 0.11.0或更新版本的broker。当调用在运行的broker版本中不可用的API时,您将收到UnsupportedVersionException

send()

public Future<RecordMetadata> send(ProducerRecord<K,V> record,Callback callback)

异步发送一条消息到 topic,并调用 callback(当发送已确认)。

  1. send是异步的,并且一旦消息被保存在“等待发送的消息缓存”中,此方法就立即返回。这样并行发送多条消息而不阻塞去等待每一条消息的响应。
  2. 发送的结果是一个 RecordMetadata,它指定了:消息发送的分区,分配的 offset 和消息的时间戳。
    • 如果topic使用的是CreateTime,则使用用户提供的时间戳或发送的时间(如果用户没有指定指定消息的时间戳)如果topic使用的是 LogAppendTime,则追加消息时,时间戳是broker的本地时间。

Throws:

  • InterruptException:如果线程在阻塞中断。
  • SerializationException:如果key或value不是给定有效配置的serializers。
  • TimeoutException:如果获取元数据或消息分配内存话费的时间超过max.block.ms。
  • KafkaException:Kafka有关的错误(不属于公共API的异常)。


由于 send 调用是异步的,它将为分配消息的此消息的 RecordMetadata 返回一个 Future如果 future 调用 get() 则将阻塞,直到相关请求完成并返回该消息的 metadata,或抛出发送异常。【???】


如果要模拟一个简单的阻塞调用,可以调用get()方法:

byte[] key = "key".getBytes();
byte[] value = "value".getBytes();
ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("my-topic", key, value)
producer.send(record).get();

完全无阻塞的话,可以利用回调参数提供的请求完成时将调用的回调通知:

ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value);
producer.send(myRecord,
              new Callback() {
                  public void onCompletion(RecordMetadata metadata, Exception e) {
                      if(e != null)
                          e.printStackTrace();
                      System.out.println("The offset of the record we just sent is: " + metadata.offset());
                  }
              });


发送到同一个分区的消息回调保证按一定的顺序执行,也就是说,在下面的例子中 callback1 保证执行 callback2 之前:

producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key1, value1), callback1);
producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key2, value2), callback2);
  • 注意:callback一般在生产者的I/O线程中执行,所以是相当的快的,否则将延迟其他的线程的消息发送。如果你需要执行阻塞或计算昂贵(消耗)的回调,建议在callback主体中使用自己的Executor来并行处理。

Consumer API

Streams API


Connect API

Admin API