查看“Kafka:API”的源代码
←
Kafka:API
跳到导航
跳到搜索
因为以下原因,您没有权限编辑本页:
您请求的操作仅限属于该用户组的用户执行:
用户
您可以查看和复制此页面的源代码。
[[category:Kafka]] == 关于 == Apache Kafka引入一个新的java客户端(在org.apache.kafka.clients 包中),替代老的Scala客户端,但是为了兼容,将会共存一段时间。 * 为了减少依赖,这些客户端都有一个独立的jar,而旧的Scala客户端继续与服务端保留在同个包下。 Kafka 有 5 个核心 API: # '''Producer API''':允许应用程序发送数据流到kafka集群中的topic。 # '''Consumer API''':允许应用程序从kafka集群的topic中读取数据流。 # '''Streams API''':允许'''从输入topic转换数据流到输出topic'''。【???】 # '''Connect API''':通过实现连接器(connector),不断地'''从一些源系统或应用程序中拉取数据到kafka,或从kafka提交数据到宿系统(sink system)或应用程序'''。【???】 # '''Admin API''':用于'''管理和检查topic,broker和其他Kafka对象'''。 kafka 公开了其所有的功能协议,与语言无关。 只有java客户端作为kafka项目的一部分进行维护,其他的作为开源的项目提供,这里提供了非java客户端的列表。 https://cwiki.apache.org/confluence/display/KAFKA/Clients == Producer API == 我们鼓励所有新开发的程序使用新的Java生产者,新的java生产者客户端比以前的Scala的客户端更快、功能更全面。 引入Maven(可以更改新的版本号): <syntaxhighlight lang="xml" highlight=""> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.8.0</version> </dependency> </syntaxhighlight> === kafka客户端发布record(消息)到kafka集群 === 新的生产者是'''线程安全'''的,在线程之间'''共享单个生产者实例''',通常单例比多个实例要快。 示例,使用producer发送一个有序的key/value(键值对): <syntaxhighlight lang="java" highlight=""> Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); for(int i = 0; i < 100; i++) producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i))); producer.close(); </syntaxhighlight> 其中: # 生产者的'''缓冲空间池'''保留尚未发送到服务器的消息,后台I/O线程负责将这些消息转换成请求发送到集群。'''如果使用后不关闭生产者,则会丢失这些消息'''。 # '''send()'''方法是'''异步'''的,添加消息到缓冲区等待发送,并立即返回。'''生产者将单个的消息批量在一起发送来提高效率'''。 # '''ack'''是判别请求是否为完整的条件(就是是判断是不是成功发送了)。我们指定了“'''all'''”将会阻塞消息,这种设置'''性能最低,但是最可靠'''。 # '''retries''',如果请求失败,生产者会'''自动重试''',我们指定是0次,如果启用重试,则'''会有重复消息的可能性'''。 # producer(生产者)缓存每个分区未发送的消息。'''缓存的大小'''是通过 '''batch.size''' 配置指定的。值较大的话将会产生更大的批。并需要更多的内存(因为每个“活跃”的分区都有1个缓冲区)。 # 默认缓冲可立即发送,即便缓冲空间还没有满,但是,如果你想减少请求的数量,可以设置 '''linger.ms''' 大于 0。这将指示生产者发送请求之前等待一段时间,希望更多的消息填补到未满的批中。 #: 这类似于TCP的算法,例如上面的代码段,可能100条消息在一个请求发送,因为我们设置了linger(逗留)时间为 1 毫秒,然后,如果我们没有填满缓冲区,这个设置将增加 1 毫秒的延迟请求以等待更多的消息。 #* 需要注意的是,在高负载下,相近的时间一般也会组成批,即使是 linger.ms=0。在不处于高负载的情况下,如果设置比0大,以少量的延迟代价换取更少的、更有效的请求。 # '''buffer.memory''' 控制生产者可用的'''缓存总量''','''如果消息发送速度比其传输到服务器的快,将会耗尽这个缓存空间'''。 #: 当缓存空间耗尽,其他发送调用将被阻塞,阻塞时间的阈值通过 '''max.block.ms''' 设定,之后它将抛出一个 TimeoutException。 # '''key.serializer''' 和 '''value.serializer''' 示例,将用户提供的 key 和 value 对象 ProducerRecord 转换成字节,你可以使用附带的 ByteArraySerializaer 或 StringSerializer 处理简单的 string 或 byte 类型。 == Consumer API == == Streams API == <syntaxhighlight lang="java" highlight=""> </syntaxhighlight> == Connect API == == Admin API ==
返回至“
Kafka:API
”。
导航菜单
个人工具
登录
命名空间
页面
讨论
大陆简体
已展开
已折叠
查看
阅读
查看源代码
查看历史
更多
已展开
已折叠
搜索
导航
首页
最近更改
随机页面
MediaWiki帮助
笔记
服务器
数据库
后端
前端
工具
《To do list》
日常
阅读
电影
摄影
其他
Software
Windows
WIKIOE
所有分类
所有页面
侧边栏
站点日志
工具
链入页面
相关更改
特殊页面
页面信息