Kafka:安装、配置
单机安装
Kafka 解压即用,并没有繁琐的安装步骤,唯一注意的是其需要 Zookeeper 支持(但其自带有 Zookeeper)。
- 官网下载压缩包:https://www.apache.org/dyn/closer.cgi?path=/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz
- 提取 tar 文件:
$ cd opt/ $ tar -zxf kafka_2.11.0.9.0.0 tar.gz
- 启动服务器:
$ cd kafka_2.11.0.9.0.0 $ bin/kafka-server-start.sh config/server.properties
- 服务器启动后,会得到以下响应:
$ bin/kafka-server-start.sh config/server.properties [2016-01-02 15:37:30,410] INFO KafkaConfig values: request.timeout.ms = 30000 log.roll.hours = 168 inter.broker.protocol.version = 0.9.0.X log.preallocate = false security.inter.broker.protocol = PLAINTEXT ……………………………………………. …………………………………………….
- 停止服务器:
$ bin/kafka-server-stop.sh config/server.properties
NOTE:
- 启动 kafka 需要执行 kafka-server-start.bat 文件,需要传入一个路径参数(server.config 文件的路径):
- 如果使用自行安装的 Zookeeper(先启动 Zookeeper,再启动 Kafka),使用“config/server.properties”。
- 如果使用 Kafka 自带的 Zookeeper(直接启动 Kafka 即可),使用:“config/zookeeper.properties”。
- properties 文件都位于“../kafka_x.xx.x.x.x.x/config”中。
使用
打开另一个命令终端启动kafka服务:
> bin/kafka-server-start.sh config/server.properties &
- 创建一个主题(topic):
- 创建一个名为“test”的Topic,只有一个分区和一个备份:
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
- 创建好之后,可以通过运行以下命令,查看已创建的topic信息:
> bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092 Topic:quickstart-events PartitionCount:1 ReplicationFactor:1 Configs: Topic: quickstart-events Partition: 0 Leader: 0 Replicas: 0 Isr: 0
- 或者,除了手工创建topic外,你也可以配置你的broker,当发布一个不存在的topic时会自动创建topic。
- 发送消息:
- Kafka提供了一个命令行的工具,可以从输入文件或者命令行中读取消息并发送给Kafka集群。每一行是一条消息。
- 运行 producer(生产者),然后在控制台输入几条消息到服务器:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test This is a message This is another message
- 消费消息:
- Kafka也提供了一个消费消息的命令行工具,将存储的信息输出出来,新打开一个命令控制台,输入:
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning This is a message This is another message
- 如果你有2台不同的终端上运行上述命令,那么当你在运行生产者时,消费者就能消费到生产者发送的消息。
- 使用 Kafka Connect 来 导入/导出 数据:
- 你可能在现有的系统中拥有大量的数据,如关系型数据库或传统的消息传递系统,以及许多已经使用这些系统的应用程序。Kafka Connect允许你不断地从外部系统提取数据到Kafka,反之亦然。用Kafka整合现有的系统是非常容易的。为了使这个过程更加容易,有数百个这样的连接器现成可用。
- 见:“Kafka:Connect”
- 使用Kafka Stream来处理数据:
- 一旦你的数据存储在Kafka中,你就可以用Kafka Streams客户端库来处理这些数据,该库适用于Java/Scala。它允许你实现自己的实时应用程序和微服务,其中输入和/或输出数据存储在Kafka主题中。Kafka Streams将在客户端编写和部署标准Java和Scala应用程序的简单性与Kafka服务器端集群技术的优势相结合,使这些应用程序具有可扩展性、弹性、容错性和分布式。该库支持精确的一次性处理、有状态操作和聚合、窗口化、连接、基于事件时间的处理等等。
- 实现流行的 WordCount 算法【???】:
KStream<String, String> textLines = builder.stream("quickstart-events"); KTable<String, Long> wordCounts = textLines .flatMapValues(line -> Arrays.asList(line.toLowerCase().split(" "))) .groupBy((keyIgnored, word) -> word) .count(); wordCounts.toStream().to("output-topic", Produced.with(Serdes.String(), Serdes.Long()));
- 停止Kafka:
- 使用 Ctrl-C 停止生产者和消费者客户端。
- 使用 Ctrl-C 停止 Kafka broker。
- 最后,使用 Ctrl-C 停止 ZooKeeper。
- 如果你还想删除你的本地Kafka环境的数据,包括你创建的消息,运行命令。
$ rm -rf /tmp/kafka-logs /tmp/zookeeper
集群搭建
背景:如安装所示,已在集群机器上安装了 Kafka。
设置多个broker集群:
- 为每个broker创建一个配置文件:
> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties
- 为不同broker修改配置文件:
config/server-1.properties: broker.id=1 listeners=PLAINTEXT://:9093 log.dir=/tmp/kafka-logs-1
config/server-2.properties: broker.id=2 listeners=PLAINTEXT://:9094 log.dir=/tmp/kafka-logs-2
- broker.id 是集群中每个节点的唯一且永久的名称。
- 【修改端口和日志目录是因为,现在在同一台机器上运行,要防止broker在同一端口上注册和覆盖对方的数据】
- 在启动新的kafka节点:
> bin/kafka-server-start.sh config/server-1.properties & ...
> bin/kafka-server-start.sh config/server-2.properties & ...
使用
- 创建一个新topic:
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
- 命令“describe topics”,查看集群:
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
- 其中:第一行是所有分区的摘要,其次,每一行提供一个分区信息(因为我们只有一个分区,所以只有一行)。
- “Leader”:该节点负责该分区的所有的读和写(每个节点的leader都是随机选择的)。
- “Replicas”:备份的节点列表,无论该节点是否是leader或者目前是否还活着,只是显示。
- “Isr”:“同步备份”的节点列表,也就是活着的节点并且正在同步leader。
- 发布信息:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic ... my test message 1 my test message 2 ^C
- 消费消息:
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic ... my test message 1 my test message 2 ^C
- 集群容错测试:
- kill掉leader,Broker1作为当前的leader,也就是kill掉Broker1。
> ps | grep server-1.properties 7564 ttys002 0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.6/Home/bin/java... > kill -9 7564
- 在Windows上使用:
> wmic process where "caption = 'java.exe' and commandline like '%server-1.properties%'" get processid ProcessId 6016 > taskkill /pid 6016 /f
- 命令“describe topics”,查看集群:
> binbin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 1,2,0 Isr: 2,0
- 备份节点之一成为新的leader,而broker1已经不在同步备份集合里了。
- 消费消息:
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic ... my test message 1 my test message 2 ^C
- 如上,消息仍然没丢。
- kill掉leader,Broker1作为当前的leader,也就是kill掉Broker1。
推荐配置
最重要的 producer 配置控制:
- 压缩
- 同步生产 vs 异步生产
- 批处理大小(异步生产)
最重要的 consumer 配置:
- 获取消息的大小
生产者服务器配置
服务器生产服务器配置:
# Replication configurations
num.replica.fetchers=4
replica.fetch.max.bytes=1048576
replica.fetch.wait.max.ms=500
replica.high.watermark.checkpoint.interval.ms=5000
replica.socket.timeout.ms=30000
replica.socket.receive.buffer.bytes=65536
replica.lag.time.max.ms=10000
replica.lag.max.messages=4000
controller.socket.timeout.ms=30000
controller.message.queue.size=10
# Log configuration
num.partitions=8
message.max.bytes=1000000
auto.create.topics.enable=true
log.index.interval.bytes=4096
log.index.size.max.bytes=10485760
log.retention.hours=168
log.flush.interval.ms=10000
log.flush.interval.messages=20000
log.flush.scheduler.interval.ms=2000
log.roll.hours=168
log.retention.check.interval.ms=300000
log.segment.bytes=1073741824
# ZK configuration
zookeeper.connection.timeout.ms=6000
zookeeper.sync.time.ms=2000
# Socket server configuration
num.io.threads=8
num.network.threads=8
socket.request.max.bytes=104857600
socket.receive.buffer.bytes=1048576
socket.send.buffer.bytes=1048576
queued.max.requests=16
fetch.purgatory.purge.interval.requests=100
producer.purgatory.purge.interval.requests=100
新版本推荐:
# ZooKeeper
zookeeper.connect=[list of ZooKeeper servers]
# Log configuration
num.partitions=8
default.replication.factor=3
log.dir=[List of directories. Kafka should have its own dedicated disk(s) or SSD(s).]
# Other configurations
broker.id=[An integer. Start with 0 and increment by 1 for each new broker.]
listeners=[list of listeners]
auto.create.topics.enable=false
min.insync.replicas=2
queued.max.requests=[number of concurrent requests]
Java版本
从安全的角度,我们推荐你使用最新的发布版本 JDK 1.8,旧版本已经公开披露了一些安全漏洞,LinkedIn 现在正在运行的是JDK 1.8 u5(希望升级到新版本)使用 G1 收集器。 如果你想在在 JDK 1.7 使用G1收集器(当前默认),请确保在 u51 或更高的版本,LinkedIn 尝试在 u21 测试,但该版本存在大量 G1 执行的问题。
LinkedIn的配置如下:
-Xmx6g -Xms6g -XX:MetaspaceSize=96m -XX:+UseG1GC
-XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M
-XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80
供参考,下面是关于 LinkedIn 的繁忙集群 (高峰) 之一的统计:
- 60 brokers
- 50k 分区 (副本 2)
- 800k 消息/秒
- 300 MB/sec的入站, 1 GB/秒+ 出站
这个调整看来相当激进, 但是集群中的有 90% 的 GC 暂停时间大约是 21ms, 以及每秒小于 1 个的年轻代 GC。
配置
Broker
基本配置如下:
- broker.id
- log.dirs
- zookeeper.connect
详细配置:
名称 | 描述 | 类型 | 默认 | 有效值 | 重要程度 | 更新模式 |
---|---|---|---|---|---|---|
kafka >= 0.10版 | ||||||
zookeeper.connect | zookeeper host string | string | 高 | |||
advertised.host.name | 过时的:当advertised.listeners或listeners没设置时候才使用。
|
string | null | 高 | ||
advertised.listeners | 发布到Zookeeper供客户端使用监听(如果不同)。在IaaS环境中,broker可能需要绑定不同的接口。如果没设置,则使用listeners。 | string | null | 高 | ||
advertised.port | 过时的:当advertised.listeners或listeners没有设置才使用。
|
int | null | 高 | ||
auto.create.topics.enable | 启用自动创建topic | boolean | true | 高 | ||
auto.leader.rebalance.enable | 启用自动平衡leader。如果需要,后台线程会定期检查并触发leader平衡。 | boolean | true | 高 | ||
background.threads | 用于各种后台处理任务的线程数 | int | 10 | [1,...] | 高 | |
broker.id | 服务器的broker id。
要避免zookeeper生成的broker id和用户配置的broker id冲突,从reserved.broker.max.id + 1开始生成。 |
int | -1 | 高 | ||
compression.type | 为给定topic指定最终的压缩类型。支持标准的压缩编码器('gzip', 'snappy', 'lz4')。也接受'未压缩',就是没有压缩。保留由producer设置的原始的压缩编码。 | string | producer | 高 | ||
delete.topic.enable | 启用删除topic。如果此配置已关闭,通过管理工具删除topic将没有任何效果 | boolean | false | 高 | ||
host.name | 过时的:当listeners没有设置才会使用。
|
string | "" | 高 | ||
leader.imbalance.check.interval.seconds | 由控制器触发分区再平衡检查的频率 | long | 300 | 高 | ||
leader.imbalance.per.broker.percentage | 允许每个broker的leader比例不平衡。如果每个broker的值高于此值,控制器将触发leader平衡,该值以百分比的形式指定。 | int | 10 | 高 | ||
listeners | 监听列表 - 监听逗号分隔的URL列表和协议。
|
string | null | 高 | ||
log.dir | 保存日志数据的目录 (补充log.dirs属性) | string | /tmp/kafka-logs | 高 | ||
log.dirs | 保存日志数据的目录。如果未设置,则使用log.dir中的值 | string | null | 高 | ||
log.flush.interval.messages | 消息刷新到磁盘之前,累计在日志分区的消息数 | long | 9223372036854775807 | [1,...] | 高 | |
log.flush.interval.ms | topic中的消息在刷新到磁盘之前保存在内存中的最大时间(以毫秒为单位),如果未设置,则使用log.flush.scheduler.interval.ms中的值 | null | 高 | |||
log.flush.offset.checkpoint.interval.ms | 我们更新的持续记录的最后一次刷新的频率。作为日志的恢复点。 | int | 60000 | [0,...] | 高 | |
log.flush.scheduler.interval.ms | 日志刷新的频率(以毫秒为单位)检查是否有任何日志需要刷新到磁盘 | long | 9223372036854775807 | 高 | ||
log.retention.bytes | 删除日志之前的最大大小 | long | -1 | 高 | ||
log.retention.hours | 删除日志文件保留的小时数(以小时为单位)。第三级是log.retention.ms属性 | int | 168 | 高 | ||
log.retention.minutes | 删除日志文件之前保留的分钟数(以分钟为单位)。次于log.retention.ms属性。如果没设置,则使用log.retention.hours的值。 | int | null | 高 | ||
log.retention.ms | 删除日志文件之前保留的毫秒数(以毫秒为单位),如果未设置,则使用log.retention.minutes的值。 | long | null | 高 | ||
log.roll.hours | 新建一个日志段的最大时间(以小时为单位),次于log.roll.ms属性 | int | 168 | [1,...] | 高 | |
log.roll.jitter.hours | 从logRollTimeMillis(以小时为单位)减去最大抖动,次于log.roll.jitter.ms属性。 | int | 0 | [0,...] | 高 | |
log.roll.ms | 新建一个日志段之前的最大事时间(以毫秒为单位)。如果未设置,则使用log.roll.hours的值。 | long | null | 高 | ||
log.segment.bytes | 单个日志文件的最大大小 | int | 1073741824 | [14,...] | 高 | |
log.segment.delete.delay.ms | 从文件系统中删除文件之前的等待的时间 | long | 60000 | [0,...] | 高 | |
message.max.bytes | 服务器可以接收的消息的最大大小 | int | 1000012 | [0,...] | 高 | |
min.insync.replicas | 当producer设置acks为"all"(或"-1")时。min.insync.replicas指定必须应答成功写入的replicas最小数。
|
int | 1 | [1,...] | 高 | |
num.io.threads | 服务器用于执行网络请求的io线程数 | int | 8 | [1,...] | 高 | |
num.network.threads | 服务器用于处理网络请求的线程数。 | int | 3 | [1,...] | 高 | |
num.recovery.threads.per.data.dir | 每个数据的目录线程数,用于启动时日志恢复和关闭时flush。 | int | 1 | [1,...] | 高 | |
num.replica.fetchers | 从源broker复制消息的提取线程数。递增该值可提高 follower broker的I/O的并发。 | int | 1 | 高 | ||
offset.metadata.max.bytes | offset提交关联元数据条目的最大大小 | int | 4096 | 高 | ||
offsets.commit.required.acks | commit之前需要的应答数,通常,不应覆盖默认的(-1) | short | -1 | 高 | ||
offsets.commit.timeout.ms | Offset提交延迟,直到所有副本都收到提交或超时。 这类似于生产者请求超时。 | int | 5000 | [1,...] | 高 | |
offsets.load.buffer.size | 当加载offset到缓存时,从offset段读取的批量大小。 | int | 5242880 | [1,...] | 高 | |
offsets.retention.check.interval.ms | 检查过期的offset的频率。 | long | 600000 | [1,...] | 高 | |
offsets.retention.minutes | offset topic的日志保留时间(分钟) | int | 1440 | [1,...] | 高 | |
offsets.topic.compression.codec | 压缩编码器的offset topic - 压缩可以用于实现“原子”提交 | int | 0 | 高 | ||
offsets.topic.num.partitions | offset commit topic的分区数(部署之后不应更改) | int | 50 | [1,...] | 高 | |
offsets.topic.replication.factor | offset topic复制因子(ps:就是备份数,设置的越高来确保可用性)。
为了确保offset topic有效的复制因子,第一次请求offset topic时,活的broker的数量必须最少最少是配置的复制因子数。 如果不是,offset topic将创建失败或获取最小的复制因子(活着的broker,复制因子的配置) |
short | 3 | [1,...] | 高 | |
offsets.topic.segment.bytes | offset topic段字节应该相对较小一点,以便于加快日志压缩和缓存加载 | int | 104857600 | [1,...] | 高 | |
port | 过时的:当listener没有设置才使用。请改用listeners。该port监听和接收连接。 | int | 9092 | 高 | ||
queued.max.requests | 在阻塞网络线程之前允许的排队请求数 | int | 500 | [1,...] | 高 | |
quota.consumer.default | 过时的:当默认动态的quotas没有配置或在Zookeeper时。如果每秒获取的字节比此值高,所有消费者将通过clientId/consumer区分限流。 | long | 9223372036854775807 | [1,...] | 高 | |
quota.producer.default | 过时的:当默认动态的quotas没有配置,或在zookeeper时。如果生产者每秒比此值高,所有生产者将通过clientId区分限流。 | long | 9223372036854775807 | [1,...] | 高 | |
replica.fetch.min.bytes Minimum | 每个获取响应的字节数。如果没有满足字节数,等待replicaMaxWaitTimeMs。 | int | 1 | 高 | ||
replica.fetch.wait.max.ms | 跟随者副本发出每个获取请求的最大等待时间,此值应始终小于replica.lag.time.max.ms,以防止低吞吐的topic的ISR频繁的收缩。 | int | 500 | 高 | ||
replica.high.watermark.checkpoint.interval.ms | 达到高“水位”保存到磁盘的频率。 | long | 5000 | 高 | ||
replica.lag.time.max.ms | 如果一个追随者没有发送任何获取请求或至少在这个时间的这个leader的没有消费完。该leader将从isr中移除这个追随者。 | long | 10000 | 高 | ||
replica.socket.receive.buffer.bytes | 用于网络请求的socket接收缓存区 | int | 65536 | 高 | ||
replica.socket.timeout.ms | 网络请求的socket超时,该值最少是replica.fetch.wait.max.ms | int | 30000 | 高 | ||
request.timeout.ms | 该配置控制客户端等待请求的响应的最大时间。
如果超过时间还没收到消费。客户端将重新发送请求,如果重试次数耗尽,则请求失败。 |
int | 30000 | 高 | ||
socket.receive.buffer.bytes | socket服务的SO_RCVBUF缓冲区。如果是-1,则默认使用OS的。 | int | 102400 | 高 | ||
socket.request.max.bytes | socket请求的最大字节数 | int | 104857600 | [1,...] | 高 | |
socket.send.buffer.bytes | socket服务的SO_SNDBUF缓冲区。如果是-1,则默认使用OS的。 | int | 102400 | 高 | ||
unclean.leader.election.enable | 是否启用不在ISR中的副本参与选举leader的最后的手段。这样做有可能丢失数据。 | boolean | true | 高 | ||
zookeeper.connection.timeout.ms | 连接zookeeper的最大等待时间,如果未设置,则使用zookeeper.session.timeout.ms。 | int | null | 高 | ||
zookeeper.session.timeout.ms | Zookeeper会话的超时时间 | int | 6000 | 高 | ||
zookeeper.set.acl | 设置客户端使用安全的ACL | boolean | false | 高 | ||
broker.id.generation.enable | 启用自动生成broker id。启用该配置时应检查reserved.broker.max.id。 | boolean | true | 中 | ||
broker.rack | broker机架,用于机架感知副本分配的失败容错。例如:RACK1, us-east-1d | string | null | 中 | ||
connections.max.idle.ms Idle | 连接超时:闲置时间超过该设置,则服务器socket处理线程关闭这个连接。 | long | 600000 | 中 | ||
controlled.shutdown.enable | 启用服务器的关闭控制。 | boolean | true | 中 | ||
controlled.shutdown.max.retries | 控制因多种原因导致的shutdown失败,当这样失败发生,尝试重试的次数 | int | 3 | 中 | ||
controlled.shutdown.retry.backoff.ms | 在每次重试之前,系统需要时间从导致先前故障的状态(控制器故障转移,复制延迟等)恢复。 此配置是重试之前等待的时间数。 | long | 5000 | 中 | ||
controller.socket.timeout.ms | 控制器到broker通道的sockt超时时间 | int | 30000 | 中 | ||
default.replication.factor | 自动创建topic的默认的副本数 | int | 1 | 中 | ||
fetch.purgatory.purge.interval.requests | 拉取请求清洗间隔(请求数) | int | 1000 | 中 | ||
group.max.session.timeout.ms | 已注册的消费者允许的最大会话超时时间,设置的时候越长使消费者有更多时间去处理心跳之间的消息。但察觉故障的时间也拉长了。 | int | 300000 | 中 | ||
group.min.session.timeout.ms | 已经注册的消费者允许最小的会话超时时间,更短的时间去快速的察觉到故障,代价是频繁的心跳,这可能会占用大量的broker资源。 | int | 6000 | 中 | ||
inter.broker.protocol.version | 指定broker内部通讯使用的版本。通常在更新broker时使用。有效的值为:0.8.0, 0.8.1, 0.8.1.1, 0.8.2, 0.8.2.0, 0.8.2.1, 0.9.0.0, 0.9.0.1。查看ApiVersion找到的全部列表。 | string | 0.10.1-IV2 | 中 | ||
log.cleaner.backoff.ms | 当没有日志要清理时,休眠的时间 | long | 15000 | [0,...] | 中 | |
log.cleaner.dedupe.buffer.size | 用于日志去重的内存总量(所有cleaner线程) | long | 134217728 | 中 | ||
log.cleaner.delete.retention.ms | 删除记录保留多长时间? | long | 86400000 | 中 | ||
log.cleaner.enable | 在服务器上启用日志清洗处理?如果使用的任何topic的cleanup.policy=compact包含内部的offset topic,应启动。如果禁用,那些topic将不会被压缩并且会不断的增大。 | boolean | true | 中 | ||
log.cleaner.io.buffer.load.factor | 日志cleaner去重缓冲负载因子。去重缓冲区的百分比,较高的值将允许同时清除更多的日志,但将会导致更多的hash冲突。 | double | 0.9 | 中 | ||
log.cleaner.io.buffer.size | 所有日志清洁器线程I/O缓存的总内存 | int | 524288 | [0,...] | 中 | |
log.cleaner.io.max.bytes.per.second | 日志清理器限制,以便其读写i/o平均小与此值。 | double | 1.7976931348623157E308 | 中 | ||
log.cleaner.min.cleanable.ratio | 脏日志与日志的总量的最小比率,以符合清理条件 | double | 0.5 | 中 | ||
log.cleaner.min.compaction.lag.ms | 一条消息在日志保留不压缩的最小时间,仅适用于正在压缩的日志。 | long | 0 | 中 | ||
log.cleaner.threads | 用于日志清除的后台线程数 | int | 1 | [0,...] | 中 | |
log.cleanup.policy | 超过保留时间段的默认清除策略。逗号分隔的有效的策略列表。有效的策略有:“delete”和“compact” | list | [delete] | [compact, delete] | 中 | |
log.index.interval.bytes | 添加一个条目到offset的间隔 | int | 4096(4 kibibytes) | [0,...] | 中 | |
log.index.size.max.bytes | offset index的最大大小(字节) | int | 10485760 | [4,...] | 中 | |
log.message.format.version | 指定追加到日志中的消息格式版本。例如: 0.8.2, 0.9.0.0, 0.10.0。
通过设置一个特定消息格式版本,用户需要保证磁盘上所有现有的消息小于或等于指定的版本。错误的设置将导致旧版本的消费者中断,因为消费者接收一个不理解的消息格式。 |
string | 0.10.1-IV2 | 中 | ||
log.message.timestamp.difference.max.ms | 如果log.message.timestamp.type=CreateTime,broker接收消息时的时间戳和消息中指定的时间戳之间允许的最大差异。
|
long | 9223372036854775807 | [0,...] | 中 | |
log.message.timestamp.type | 定义消息中的时间戳是消息创建时间或日志追加时间。该值可设置为CreateTime 或 LogAppendTime | string | CreateTime | [CreateTime, LogAppendTime] | 中 | |
log.preallocate | 在创建新段时预分配文件?如果你在Windowns上使用kafka,你可能需要设置它为true。 | boolean | false | 中 | ||
log.retention.check.interval.ms | 日志清除程序检查日志是否满足被删除的频率(以毫秒为单位) | long | 300000 | [1,...] | 中 | |
max.connections.per.ip | 允许每个ip地址的最大连接数。 | int | 2147483647 | [1,...] | 中 | |
max.connections.per.ip.overrides | per-ip或hostname覆盖默认最大连接数 | string | "" | 中 | ||
num.partitions | topic的默认分区数 | int | 1 | [1,...] | 中 | |
principal.builder.class | 实现PrincipalBuilder接口类的完全限定名,该接口目前用于构建与SSL SecurityProtocol连接的Principal。 | class | class org。apache。kafka。common。security。auth。DefaultPrincipalBuilder(wiki傻逼编辑器:。换位.) | 中 | ||
producer.purgatory.purge.interval.requests | 生产者请求purgatory的清洗间隔(请求数) | int | 1000 | 中 | ||
replica.fetch.backoff.ms | 当拉取分区发生错误时休眠的时间 | int | 1000 | [0,...] | 中 | |
replica.fetch.max.bytes | 拉取每个分区的消息的字节数。这不是绝对的最大值,如果提取的第一个非空分区中的第一个消息大于这个值,则消息仍然返回,以确保进展。通过message.max.bytes (broker配置)或max.message.bytes (topic配置)定义broker接收的最大消息大小。 | int | 1048576 | [0,...] | 中 | |
replica.fetch.response.max.bytes | 预计整个获取响应的最大字节数,这不是绝对的最大值,如果提取的第一个非空分区中的第一个消息大于这个值,则消息仍然返回,以确保进展。
通过message.max.bytes (broker配置)或max.message.bytes (topic配置)定义broker接收的最大消息大小。 |
int | 10485760 | [0,...] | 中 | |
reserved.broker.max.id | broker.id的最大数 | int | 1000 | [0,...] | 中 | |
sasl.enabled.mechanisms | 可用的SASL机制列表,包含任何可用的安全提供程序的机制。默认情况下只有GSSAPI是启用的。 | list | [GSSAPI] | 中 | ||
sasl.kerberos.kinit.cmd | Kerberos kinit 命令路径。 | string | /usr/bin/kinit | 中 | ||
sasl.kerberos.min.time.before.relogin | 登录线程在刷新尝试的休眠时间。 | long | 60000 | 中 | ||
sasl.kerberos.principal.to.local.rules | principal名称映射到一个短名称(通常是操作系统用户名)。按顺序,使用与principal名称匹配的第一个规则将其映射其到短名称。忽略后面的规则。
默认情况下,{username}/{hostname}@{REALM} 映射到 {username}。 |
list | [DEFAULT] | 中 | ||
sasl.kerberos.service.name | Kafka运行的Kerberos principal名称。可以在JAAS或Kafka的配置文件中定义。 | string | null | 中 | ||
sasl.kerberos.ticket.renew.jitter | 添加到更新时间的随机抖动的百分比 | time. double | 0.05 | 中 | ||
sasl.kerberos.ticket.renew.window.factor | 登录线程休眠,直到从上次刷新到ticket的到期的时间已到达(指定窗口因子),在此期间它将尝试更新ticket。 | double | 0.8 | 中 | ||
sasl.mechanism.inter.broker.protocol | SASL机制,用于broker之间的通讯,默认是GSSAPI。 | string | GSSAPI | 中 | ||
security.inter.broker.protocolSecurity | broker之间的通讯协议,有效值有:PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL。 | string | PLAINTEXT | 中 | ||
ssl.cipher.suites | 密码套件列表。认证,加密,MAC和秘钥交换算法的组合,用于使用TLS或SSL的网络协议交涉网络连接的安全设置,默认情况下,支持所有可用的密码套件。 | list | null | 中 | ||
ssl.client.auth | 配置请求客户端的broker认证。
常见的设置:
|
string | none | [required, requested, none] | 中 | |
ssl.enabled.protocols | 已启用的SSL连接协议列表。 | list | [TLSv1.2, TLSv1.1, TLSv1] | 中 | ||
ssl.key.password | 秘钥库文件中的私钥密码。对客户端是可选的。 | password | null | 中 | ||
ssl.keymanager.algorithm | 用于SSL连接的密钥管理工厂算法。默认值是Java虚拟机的密钥管理工厂算法。 | string | SunX509 | 中 | ||
ssl.keystore.location | 密钥仓库文件的位置。客户端可选,并可用于客户端的双向认证。 | string | null | 中 | ||
ssl.keystore.password | 密钥仓库文件的仓库密码。客户端可选,只有ssl.keystore.location配置了才需要。 | password | null | 中 | ||
ssl.keystore.type | 密钥仓库文件的格式。客户端可选。 | string | JKS | 中 | ||
ssl.protocol | 用于生成SSLContext,默认是TLS,适用于大多数情况。允许使用最新的JVM,LS, TLSv1.1 和TLSv1.2。 SSL,SSLv2和SSLv3 老的JVM也可能支持,由于有已知的安全漏洞,不建议使用。 | string | TLS | 中 | ||
ssl.provider | 用于SSL连接的安全提供程序的名称。默认值是JVM的安全程序。 | string | null | 中 | ||
ssl.trustmanager.algorithm | 信任管理工厂用于SSL连接的算法。默认为Java虚拟机配置的信任算法。 | string | PKIX | 中 | ||
ssl.truststore.location | 信任仓库文件的位置 | string | null | 中 | ||
ssl.truststore.password | 信任仓库文件的密码 | password | null | 中 | ||
ssl.truststore.type | 信任仓库文件的文件格式 | string | JKS | 中 | ||
authorizer.class.name | 用于认证的授权程序类 | string | "" | 低 | ||
metric.reporters | 度量报告的类列表,通过实现MetricReporter接口,允许插入新度量标准类。JmxReporter包含注册JVM统计。 | list | [] | 低 | ||
metrics.num.samples | 维持计算度量的样本数。 | int | 2 | [1,...] | 低 | |
metrics.sample.window.ms | 计算度量样本的时间窗口 | long | 30000 | [1,...] | 低 | |
quota.window.num | 在内存中保留客户端限额的样本数 | int | 11 | [1,...] | 低 | |
quota.window.size.seconds | 每个客户端限额的样本时间跨度 | int | 1 | [1,...] | 低 | |
replication.quota.window.num | 在内存中保留副本限额的样本数 | int | 11 | [1,...] | 低 | |
replication.quota.window.size.seconds | 每个副本限额样本数的时间跨度 | int | 1 | [1,...] | 低 | |
ssl.endpoint.identification.algorithm | 端点身份标识算法,使用服务器证书验证服务器主机名。 | string | null | 低 | ||
ssl.secure.random.implementation | 用于SSL加密操作的SecureRandom PRNG实现。 | string | null | 低 | ||
zookeeper.sync.time.ms | ZK follower可落后与leader多久。 | int | 2000 | 低 | ||
以下是kafka新版本的增量配置 | ||||||
kafka >= 1.0 | ||||||
group.initial.rebalance.delay.ms | 分组协调器在执行第一次重新平衡之前,等待更多消费者加入新组的时间。延迟时间越长,意味着重新平衡的次数可能越少,但会增加处理开始前的时间。 | int | 3000 | 中 | 只读 | |
transaction.abort.timed.out.transaction.cleanup.interval.ms | 回滚已超时的事务的时间间隔。 | int | 10000 (10 seconds) | [1,...] | 低 | 只读 |
transaction.remove.expired.transaction.cleanup.interval.ms | 删除因transactional.id.expiration.ms过期的事务的时间间隔。 | int | 3600000 (1 hour) | [1,...] | 低 | 只读 |
transaction.max.timeout.ms | 事务的最大允许超时时间。如果客户端请求的交易时间超过了这个时间,那么broker将在InitProducerIdRequest中返回一个错误。
这可以防止客户端的超时时间过大,从而阻滞消费者从事务中包含的主题中读取。 |
int | 900000 (15 minutes) | [1,...] | 高 | 只读 |
transaction.state.log.load.buffer.size | 在将生产者id和事务加载到缓存中时,从事务日志段读取的批次大小(软限制,如果消息太大,则重写) | int | 5242880 | [1,...] | 高 | 只读 |
transaction.state.log.min.isr | 覆盖事务topic的min.insync.replicas配置。 | int | 2 | [1,...] | 高 | 只读 |
transaction.state.log.num.partitions | 事务topic的分区数(部署后不应改变)。 | int | 50 | [1,...] | 高 | 只读 |
transaction.state.log.replication.factor | 事务topic的复制因子(设置较高来确保可用性)。内部topic创建将失败,直到集群规模满足该复制因子要求。 | short | 3 | [1,...] | 高 | 只读 |
transaction.state.log.segment.bytes | 事务topic段的字节数应保持相对较小,以利于加快日志压缩和缓存加载速度 | int | 104857600 (100 mebibytes) | [1,...] | 高 | 只读 |
transactional.id.expiration.ms | 事务协调器在没有收到当前事务的任何事务状态更新的情况下,在其事务id过期前等待的时间,单位为ms。
这个设置也会影响生产者id过期:一旦这个时间在给定的生产者id最后一次写入后过去,生产者id就会过期。
|
int | 604800000 (7 days) | [1,...] | 高 | 只读 |
kafka >= 2.0 | ||||||
sasl.client.callback.handler.class | 实现AuthenticateCallbackHandler接口的SASL客户端回调处理程序类的全称。 | class | null | 中间 | 只读 | |
sasl.login.callback.handler.class | 实现AuthenticateCallbackHandler接口的SASL登录回调处理程序类的全称。对于broker来说,登录回调处理程序配置必须以监听器前缀和小写的SASL机制名称为前缀。
例如,listener.name.sasl_ssl.scram-sha-256.sasl.login.callback.handler.class=com.example.CustomScramLoginCallbackHandler |
class | null | 中间 | 只读 | |
sasl.login.class | 实现Login接口的类的全称。对于broker来说,login config必须以监听器前缀和SASL机制名称为前缀,并使用小写。
例如,listener.name.sasl_ssl.scram-sha-256.sasl.login.class=com.example.CustomScramLogin。 |
class | null | 中间 | 只读 | |
kafka >= 2.5 | ||||||
zookeeper.clientCnxnSocket | 当使用TLS连接到ZooKeeper时,通常设置为org.apache.zookeeper.ClientCnxnSocketNetty。
覆盖任何同名的zookeeper.clientCnxnSocket设置的显式值。 |
string | null | 中间 | 只读 | |
zookeeper.ssl.client.enable | 设置客户端连接到ZooKeeper时使用TLS。
显式的值会覆盖任何通过zookeeper.client.secure设置的值(注意名称不同)。 如果两者都没有设置,默认为false; 当为true时,必须设置zookeeper.clientCnxnSocket(通常为org.apache.zookeeper.ClientCnxnSocketNetty); 其他需要设置的值可能包括zookeeper.ssl.cipher.suites、zookeeper.ssl.crl.enable、zookeeper.ssl.enabled.protocols、zookeeper.ssl.endpoint. identification.algorithm,zookeeper.ssl.keystore.location,zookeeper.ssl.keystore.password,zookeeper.ssl.keystore.type,zookeeper.ssl. ocsp.enable, zookeeper.ssl.protocol, zookeeper.ssl.truststore.location, zookeeper.ssl.truststore.password, zookeeper.ssl.truststore.type。 |
boolean | false | 中间 | 只读 | |
zookeeper.ssl.keystore.location | 当使用客户端证书与TLS连接到ZooKeeper时的keystore位置。
覆盖任何通过zookeeper.ssl.keyStore.location系统属性设置的显式值(注意是驼峰大小)。 |
password | null | 中间 | 只读 | |
zookeeper.ssl.keystore.password | 当使用客户端证书与TLS连接到ZooKeeper时的keystore密码。覆盖任何通过`zookeeper.ssl.keyStore.password系统属性设置的显式值(注意驼峰大写)。 注意,ZooKeeper不支持与keystore密码不同的密钥密码,所以一定要将keystore中的密钥密码设置为与keystore密码相同,否则连接Zookeeper的尝试将失败。 | password | null | 中间 | 只读 | |
zookeeper.ssl.keystore.type | 当使用客户端证书与TLS连接到ZooKeeper时的keystore类型。覆盖任何通过zookeeper.ssl.keyStore.type系统属性设置的显式值(注意骆驼大写)。
默认值为null意味着该类型将根据keystore的文件扩展名自动检测。 |
string | null | 中间 | 只读 | |
zookeeper.ssl.protocol | 指定ZooKeeper TLS协商中使用的协议。
一个显式的值会覆盖任何通过同名的zookeeper.ssl.protocol系统设置的值。 |
string | TLSv1.2 | 低 | 只读 | |
zookeeper.ssl.cipher.suites | 指定在ZooKeeper TLS协商中使用的密码套件(csv),覆盖任何通过zookeeper.ssl.ciphersuites系统属性设置的显式值(注意单字 "ciphersuites")。
覆盖任何通过zookeeper.ssl.ciphersuites系统属性设置的显式值(注意 "ciphersuites "这个单字)。 默认值为 "null "意味着启用的密码套件列表是由正在使用的Java运行时决定的。 |
boolean | false | 低 | 只读 | |
zookeeper.ssl.crl.enable | 指定是否启用ZooKeeper TLS协议中的证书撤销列表。
覆盖任何通过zookeeper.ssl.crl系统属性设置的显式值(注意是短名)。 |
boolean | false | 低 | 只读 | |
zookeeper.ssl.enabled.protocols | 指定ZooKeeper TLS协商(csv)中启用的协议。
覆盖任何通过zookeeper.ssl.enabledProtocols系统属性设置的显式值(注意骆驼大写)。 默认值为 "null "意味着启用的协议将是zookeeper.ssl.protocol配置属性的值。 |
list | null | 低 | 只读 | |
zookeeper.ssl.endpoint.identification.algorithm | 指定是否在ZooKeeper TLS协商过程中启用主机名验证,(不区分大小写)"https "表示启用ZooKeeper主机名验证,显式的空白值表示禁用(仅为测试目的建议禁用)。
明确的值会覆盖任何通过zookeeper.ssl.hostnameVerification系统属性设置的 "true "或 "false "值(注意不同的名称和值;true意味着https,false意味着空白)。 |
string | HTTPS | 低 | 只读 | |
zookeeper.ssl.ocsp.enable | 指定是否启用ZooKeeper TLS协议中的在线证书状态协议。
覆盖任何通过zookeeper.ssl.ocsp系统属性设置的显式值(注意是短名)。 |
boolean | false | 低 | 只读 | |
kafka >= 2.7 | ||||||
ssl.truststore.certificates | 可信证书的格式由'ssl.truststore.type'指定。
默认的SSL引擎工厂只支持带X.509证书的PEM格式。 |
password | null | 中间 | 每个broker | |
socket.connection.setup.timeout.max.ms | 客户端等待建立socket连接的最大时间。
连接设置超时时间将随着每一次连续的连接失败而成倍增加,直到这个最大值。 为了避免连接风暴,超时时间将被应用一个0.2的随机因子,导致计算值在20%以下和20%以上的随机范围。 |
long | 127000 (127 seconds) | 中间 | 只读 | |
socket.connection.setup.timeout.ms | 客户端等待建立socket连接的时间。
如果在超时之前没有建立连接,客户端将关闭socket通道。 |
long | 10000 (10 seconds) | 中间 | 只读 |
Topic
与topic相关的配置,服务器的默认值,也可选择的覆盖指定的topic。
- 如果没有给出指定topic的配置,则将使用服务器默认值。
- 可以通过“-config”选项在topic创建时设置。
此示例使用自定义最大消息的大小和刷新率,创建一个名为 my-topic 的topic:
> bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic my-topic --partitions 1
--replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1
也可以使用“alter configs”命令修改或设置。 此示例修改更新 my-topic 的最大的消息大小:
> bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic
--alter --add-config max.message.bytes=128000
可以执行以下命令验证结果:
> bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic --describe
移除设置:
> bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic --alter --delete-config max.message.bytes
详细配置:
名称 | 描述 | 类型 | 默认 | 有效值 | 服务器默认属性 | 更新模式 |
---|---|---|---|---|---|---|
cleanup.policy | “delete”或“compact”。指定在旧的日志段的保留策略。
默认策略(“delete”),将达到保留时间或大小限制的日志废弃。 “compact”则压缩日志。 |
list | delete | [compact, delete] | log.cleanup.policy | 中 |
compression.type | 针对指定的topic设置最终的压缩方式。
标准的压缩格式有'gzip', 'snappy', lz4。还可以设置'uncompressed',就是不压缩;设置为'producer'这意味着保留生产者设置的原始压缩编解码。 |
string | producer | [uncompressed, snappy, lz4, gzip, producer] | compression.type | 中 |
delete.retention.ms | 保留删除消息压缩topic的删除标记的时间。此设置还给出消费者如果从offset 0开始读取并确保获得最终阶段的有效快照的时间范围(否则,在完成扫描之前可能已经回收了)。 | long | 86400000 | [0,...] | log.cleaner.delete.retention.ms | 中 |
file.delete.delay.ms | 从文件系统中删除文件之前等待的时间 | long | 60000 | [0,...] | log.segment.delete.delay.ms | 中 |
flush.messages | 此设置允许指定我们强制fsync写入日志的数据的间隔。
例如,如果这被设置为1,我们将在每个消息之后fsync; 如果是5,我们将在每五个消息之后fsync。
|
long | 9223372036854775807 | [0,...] | log.flush.interval.messages | 中 |
flush.ms | 此设置允许我们强制fsync写入日志的数据的时间间隔。
例如,如果这设置为1000,那么在1000ms过去之后,我们将fsync。
|
long | 9223372036854775807 | [0,...] | log.flush.interval.ms | 中 |
follower.replication.throttled.replicas | follower复制限流列表。
该列表应以[PartitionId]的形式描述一组副本:[BrokerId],[PartitionId]:[BrokerId]:...或者通配符'*'可用于限制此topic的所有副本。 |
list | "" | [partitionId],[brokerId]:[partitionId],[brokerId]:... | follower.replication.throttled.replicas | 中 |
index.interval.bytes | 此设置控制Kafka向其offset索引添加索引条目的频率。默认设置确保我们大致每4096个字节索引消息。
更多的索引允许读取更接近日志中的确切位置,但使索引更大。你不需要改变这个值。 |
int | 4096 | [0,...] | log.index.interval.bytes | 中 |
leader.replication.throttled.replicas | 在leader方面进行限制的副本列表。
该列表设置以[PartitionId]的形式描述限制副本:[PartitionId]:[BrokerId],[PartitionId]:[BrokerId]:...或使用通配符‘*’限制该topic的所有副本。 |
list | "" | [partitionId],[brokerId]:[partitionId],[brokerId]:... | leader.replication.throttled.replicas | 中 |
max.message.bytes | kafka允许的最大的消息批次大小。
如果增加此值,并且消费者的版本比0.10.2老,那么消费者的提取的大小也必须增加,以便他们可以获取大的消息批次。 在最新的消息格式版本中,消息总是分组批量来提高效率。 在之前的消息格式版本中,未压缩的记录不会分组批量,并且此限制仅适用于该情况下的单个消息。 |
int | 1000012 | [0,...] | message.max.bytes | 中 |
message.format.version | 指定消息附加到日志的消息格式版本。
该值应该是一个有效的ApiVersion。例如:0.8.2, 0.9.0.0, 0.10.0,更多细节检查ApiVersion。 通过设置特定的消息格式版本,并且磁盘上的所有现有消息都小于或等于指定版本。 不正确地设置此值将导致消费者使用旧版本,因为他们将接收到“不认识”的格式的消息。 |
string | 0.11.0-IV2 | log.message.format.version | 中 | |
min.cleanable.dirty.ratio | 此配置控制日志压缩程序将尝试清除日志的频率(假设启用了日志压缩)。
默认情况下,我们将避免清理超过50%日志被压缩的日志。 该比率限制日志中浪费的最大空间重复(在最多50%的日志中可以是重复的50%)。 更高的比率意味着更少,更有效的清洁,但意味着日志中的浪费更多。 |
double | 0.5 | [0,...,1] | log.cleaner.min.cleanable.ratio | 中 |
min.compaction.lag.ms | 消息在日志中保持不压缩的最短时间。
仅适用于正在压缩的日志。 |
long | 0 | [0,...] | log.cleaner.min.compaction.lag.ms | 中 |
min.insync.replicas | 当生产者设置应答为"all"(或“-1”)时,此配置指定了成功写入的副本应答的最小数。
如果没满足此最小数,则生产者将引发异常(NotEnoughReplicas或NotEnoughReplicasAfterAppend) 当min.insync.replicas和acks强制更大的耐用性时。 典型的情况是创建一个副本为3的topic,将min.insync.replicas设置为2,并设置acks为“all”。 如果多数副本没有收到写入,这将确保生产者引发异常。 |
int | 1 | [1,...] | min.insync.replicas | 中 |
preallocate | 如果我们在创建新的日志段时在磁盘上预分配该文件,那么设为True。 | boolean | false | log.preallocate | 中 | |
retention.bytes | 如果我们使用“删除”保留策略,则此配置将控制日志可以增长的最大大小,之后我们将丢弃旧的日志段以释放空间。
默认情况下,没有设置大小限制则仅限于时间限制。 |
long | -1 | log.retention.bytes | 中 | |
retention.ms | 如果我们使用“删除”保留策略,则此配置控制我们将保留日志的最长时间,然后我们将丢弃旧的日志段以释放空间。
这代表SLA消费者必须读取数据的时间长度。 |
long | 604800000 | log.retention.ms | 中 | |
segment.bytes | 此配置控制日志的段文件大小。一次保留和清理一个文件,因此较大的段大小意味着较少的文件,但对保留率的粒度控制较少。 | int | 1073741824 | [14,...] | log.segment.bytes | 中 |
segment.index.bytes | 此配置控制offset映射到文件位置的索引的大小。我们预先分配此索引文件,并在日志滚动后收缩它。通常不需要更改此设置。 | int | 10485760 | [0,...] | log.index.size.max.bytes | 中 |
segment.jitter.ms | 从计划的段滚动时间减去最大随机抖动,以避免异常的段滚动 | long | 0 | [0,...] | log.roll.jitter.ms | 中 |
segment.ms | 此配置控制Kafka强制日志滚动的时间段,以确保保留可以删除或压缩旧数据,即使段文件未满。 | long | 604800000 | [0,...] | log.roll.ms | 中 |
unclean.leader.election.enable | 是否将不在ISR中的副本作为最后的手段选举为leader,即使这样做可能会导致数据丢失。 | boolean | false | unclean.leader.election.enable | 中 | |
kafka > 2.0 | ||||||
message.downconversion.enable | 此配置控制是否启用消息格式的向下转换以满足消费请求。
当设置为false时,broker不会对期待旧消息格式的消费者执行向下转换。broker 会对来自此类旧客户端的消费请求作出 UNSUPPORTED_VERSION 错误响应。
|
boolean | false | log.message.downconversion.enable | 低 |
Producer
详细配置:
名称 | 描述 | 类型 | 默认 | 有效值 | 重要程度 |
---|---|---|---|---|---|
bootstrap.servers | host/port列表,用于初始化建立和Kafka集群的连接。
列表格式为host1:port1,host2:port2,....,无需添加所有的集群地址,kafka会根据提供的地址发现其他的地址(你可以多提供几个,以防提供的服务器关闭) |
list | 高 | ||
key.serializer | 实现 org.apache.kafka.common.serialization.Serializer 接口的 key 的 Serializer 类。 | class | 高 | ||
value.serializer | 实现 org.apache.kafka.common.serialization.Serializer 接口的value 的 Serializer 类。 | class | 高 | ||
acks | 生产者需要leader确认请求完成之前接收的应答数。
此配置控制了发送消息的耐用性,支持以下配置:
|
string | 1 | [all, -1, 0, 1] | 高 |
buffer.memory | 生产者用来缓存等待发送到服务器的消息的内存总字节数。如果消息发送比可传递到服务器的快,生产者将阻塞max.block.ms之后,抛出异常。
此设置应该大致的对应生产者将要使用的总内存,但不是硬约束,因为生产者所使用的所有内存都用于缓冲。一些额外的内存将用于压缩(如果启动压缩),以及用于保持发送中的请求。 |
long | 33554432 | [0,...] | 高 |
compression.type | 数据压缩的类型。默认为空(就是不压缩)。有效的值有 none,gzip,snappy, 或 lz4。
压缩全部的数据批,因此批的效果也将影响压缩的比率(更多的批次意味着更好的压缩)。 |
string | none | 高 | |
retries | 设置一个比零大的值,客户端如果发送失败则会重新发送。
注意,这个重试功能和客户端在接到错误之后重新发送没什么不同。 如果max.in.flight.requests.per.connection没有设置为1,有可能改变消息发送的顺序,因为如果2个批次发送到一个分区中,并第一个失败了并重试,但是第二个成功了,那么第二个批次将超过第一个。 |
int | 0 | [0,...,2147483647] | 高 |
ssl.key.password | 密钥仓库文件中的私钥的密码。 | password | null | 高 | |
ssl.keystore.location | 密钥仓库文件的位置。可用于客户端的双向认证。 | string | null | 高 | |
ssl.keystore.password | 密钥仓库文件的仓库密码。
只有配置了ssl.keystore.location时才需要。 |
password | null | 高 | |
ssl.truststore.location | 信任仓库的位置 | string | null | 高 | |
ssl.truststore.password | 信任仓库文件的密码 | password | null | 高 | |
batch.size | 当多个消息要发送到相同分区的时,生产者尝试将消息批量打包在一起,以减少请求交互。
这样有助于客户端和服务端的性能提升。该配置的默认批次大小(以字节为单位): 不会打包大于此配置大小的消息。 发送到broker的请求将包含多个批次,每个分区一个,用于发送数据。 较小的批次大小有可能降低吞吐量(批次大小为0则完全禁用批处理)。 一个非常大的批次大小可能更浪费内存。因为我们会预先分配这个资源。 |
int | 16384 | [0,...] | 中 |
client.id | 当发出请求时传递给服务器的id字符串。这样做的目的是允许服务器请求记录记录这个【逻辑应用名】,这样能够追踪请求的源,而不仅仅只是ip/prot。 | string | "" | 中 | |
connections.max.idle.ms | 多少毫秒之后关闭闲置的连接。 | long | 540000 | 中 | |
linger.ms | 生产者组将发送的消息组合成单个批量请求。
正常情况下,只有消息到达的速度比发送速度快的情况下才会出现。但是,在某些情况下,即使在适度的负载下,客户端也可能希望减少请求数量。此设置通过添加少量人为延迟来实现。- 也就是说,不是立即发出一个消息,生产者将等待一个给定的延迟,以便和其他的消息可以组合成一个批次。这类似于Nagle在TCP中的算法。此设置给出批量延迟的上限:一旦我们达到分区的batch.size值的记录,将立即发送,不管这个设置如何,但是,如果比这个小,我们将在指定的“linger”时间内等待更多的消息加入。此设置默认为0(即无延迟)。假设,设置 linger.ms=5,将达到减少发送的请求数量的效果,但对于在没有负载情况,将增加5ms的延迟。 |
long | 0 | [0,...] | 中 |
max.block.ms | 该配置控制 KafkaProducer.send() 和 KafkaProducer.partitionsFor() 将阻塞多长时间。
此外这些方法被阻止,也可能是因为缓冲区已满或元数据不可用。在用户提供的序列化程序或分区器中的锁定不会计入此超时。 |
long | 60000 | [0,...] | 中 |
max.request.size | 请求的最大大小(以字节为单位)。
此设置将限制生产者的单个请求中发送的消息批次数,以避免发送过大的请求。这也是最大消息批量大小的上限。 请注意,服务器拥有自己的批量大小,可能与此不同。 |
int | 1048576 | [0,...] | 中 |
partitioner.class | 实现Partitioner接口的的Partitioner类。 | class | org.apache.kafka.clients.producer.internals.DefaultPartitioner | 中 | |
receive.buffer.bytes | 读取数据时使用的TCP接收缓冲区(SO_RCVBUF)的大小。
如果值为-1,则将使用OS默认值。 |
int | 32768 | [-1,...] | 中 |
request.timeout.ms | 该配置控制客户端等待请求响应的最长时间。
如果在超时之前未收到响应,客户端将在必要时重新发送请求,如果重试耗尽,则该请求将失败。 这应该大于replica.lag.time.max.ms,以减少由于不必要的生产者重试引起的消息重复的可能性。 |
int | 30000 | [0,...] | 中 |
sasl.jaas.config | JAAS配置文件使用的格式的SASL连接的JAAS登录上下文参数。这里描述JAAS配置文件格式。该值的格式为:'(=)*;' | password | null | 中 | |
sasl.kerberos.service.name | Kafka运行的Kerberos主体名称。
可以在Kafka的JAAS配置或Kafka的配置中定义。 |
string | null | 中 | |
sasl.mechanism | SASL机制用于客户端连接。这是安全提供者可用与任何机制。
GSSAPI是默认机制。 |
string | GSSAPI | 中 | |
security.protocol | 用于与broker通讯的协议。
有效值为:PLAINTEXT,SSL,SASL_PLAINTEXT,SASL_SSL。 |
string | PLAINTEXT | 中 | |
send.buffer.bytes | 发送数据时,用于TCP发送缓存(SO_SNDBUF)的大小。
如果值为 -1,将默认使用系统的。 |
int | 131072 | [-1,...] | 中 |
ssl.enabled.protocols | 启用SSL连接的协议列表。 | list | TLSv1.2,TLSv1.1,TLSv1 | 中 | |
ssl.keystore.type | 密钥存储文件的文件格式。
对于客户端是可选的。 |
string | JKS | 中 | |
ssl.protocol | 最近的JVM中允许的值是TLS,TLSv1.1和TLSv1.2。
较旧的JVM可能支持SSL,SSLv2和SSLv3,但由于已知的安全漏洞,不建议使用SSL。 |
string | TLS | 中 | |
ssl.provider | 用于SSL连接的安全提供程序的名称。默认值是JVM的默认安全提供程序。 | string | null | 中 | |
ssl.truststore.type | 信任仓库文件的文件格式。 | string | JKS | 中 | |
enable.idempotence | 当设置为‘true’,生产者将确保每个消息正好一次复制写入到stream。如果‘false’,由于broker故障,生产者重试。
即,可以在流中写入重试的消息。此设置默认是‘false’。请注意,启用幂等式需要将max.in.flight.requests.per.connection设置为1,重试次数不能为零。 另外acks必须设置为“全部”。如果这些值保持默认值,我们将覆盖默认值。 如果这些值设置为与幂等生成器不兼容的值,则将抛出一个ConfigException异常。 如果这些值设置为与幂等生成器不兼容的值,则将抛出一个ConfigException异常。 |
boolean | false | 低 | |
interceptor.classes | 实现ProducerInterceptor接口,你可以在生产者发布到Kafka群集之前拦截(也可变更)生产者收到的消息。
默认情况下没有拦截器。 |
list | null | 低 | |
max.in.flight.requests.per.connection | 阻塞之前,客户端单个连接上发送的未应答请求的最大数量。
注意,如果此设置设置大于1且发送失败,则会由于重试(如果启用了重试)会导致消息重新排序的风险。 |
int | 5 | [1,...] | 低 |
metadata.max.age.ms | 在一段时间段之后(以毫秒为单位),强制更新元数据,即使我们没有看到任何分区leader的变化,也会主动去发现新的broker或分区。 | long | 300000 | [0,...] | 低 |
metric.reporters | 用作metrics reporters(指标记录员)的类的列表。
实现MetricReporter接口,将受到新增加的度量标准创建类插入的通知。 JmxReporter始终包含在注册JMX统计信息中。 |
list | "" | 低 | |
metrics.num.samples | 维护用于计算度量的样例数量。 | int | 2 | [1,...] | 低 |
metrics.recording.level | 指标的最高记录级别。 | string | INFO | [INFO, DEBUG] | 低 |
metrics.sample.window.ms | 度量样例计算上 | long | 30000 | [0,...] | 低 |
reconnect.backoff.max.ms | 重新连接到重复无法连接的代理程序时等待的最大时间(毫秒)。
如果提供,每个主机的回退将会连续增加,直到达到最大值。 计算后退增加后,增加20%的随机抖动以避免连接风暴。 |
long | 1000 | [0,...] | 低 |
reconnect.backoff.ms | 尝试重新连接到给定主机之前等待的基本时间量。这避免了在循环中高频率的重复连接到主机。这种回退适应于客户端对broker的所有连接尝试。 | long | 50 | [0,...] | 低 |
retry.backoff.ms | 尝试重试指定topic分区的失败请求之前等待的时间。
这样可以避免在某些故障情况下高频次的重复发送请求。 |
long | 100 | [0,...] | 低 |
sasl.kerberos.kinit.cmd | Kerberos kinit 命令路径。 | string | /usr/bin/kinit | 低 | |
sasl.kerberos.min.time.before.relogin | Login线程刷新尝试之间的休眠时间。 | long | 60000 | 低 | |
sasl.kerberos.ticket.renew.jitter | 添加更新时间的随机抖动百分比。 | double | 0.05 | 低 | |
sasl.kerberos.ticket.renew.window.factor | 登录线程将睡眠,直到从上次刷新ticket到期时间的指定窗口因子为止,此时将尝试续订ticket。 | double | 0.8 | 低 | |
ssl.cipher.suites | 密码套件列表。这是使用TLS或SSL网络协议来协商用于网络连接的安全设置的认证,加密,MAC和密钥交换算法的命名组合。默认情况下,支持所有可用的密码套件。 | list | null | 低 | |
ssl.endpoint.identification.algorithm | 使用服务器证书验证服务器主机名的端点识别算法。 | string | null | 低 | |
ssl.keymanager.algorithm | 用于SSL连接的密钥管理因子算法。默认值是为Java虚拟机配置的密钥管理器工厂算法。 | string | SunX509 | 低 | |
ssl.secure.random.implementation | 用于SSL加密操作的SecureRandom PRNG实现。 | string | null | 低 | |
ssl.trustmanager.algorithm | 用于SSL连接的信任管理因子算法。
默认值是JAVA虚拟机配置的信任管理工厂算法。 |
string | PKIX | 低 | |
transaction.timeout.ms | 生产者在主动中止正在进行的交易之前,交易协调器等待事务状态更新的最大时间(以ms为单位)。
如果此值大于broker中的max.transaction.timeout.ms设置,则请求将失败,并报“InvalidTransactionTimeout”错误。 |
int | 60000 | 低 | |
transactional.id | 用于事务传递的TransactionalId。
这样可以跨多个生产者会话的可靠性语义,因为它允许客户端保证在开始任何新事务之前使用相同的TransactionalId的事务已经完成。 如果没有提供TransactionalId,则生产者被限制为幂等传递。 请注意,如果配置了TransactionalId,则必须启用enable.idempotence。 默认值为空,这意味着无法使用事务。 |
string | null | non-empty string | 低 |
kafka >= 2.0.0 | |||||
sasl.client.callback.handler.class | 实现AuthenticateCallbackHandler接口的SASL客户端回调处理程序类的全称。 | class | null | 中间 | |
sasl.login.callback.handler.class | 实现AuthenticateCallbackHandler接口的SASL登录回调处理程序类的全称。
对于broker来说,登录回调处理程序配置必须以监听器前缀和小写的SASL机制名称为前缀。 例如,listener.name.sasl_ssl.scram-sha-256.sasl.login.callback.handler.class=com.example.CustomScramLoginCallbackHandler |
class | null | 中 | |
sasl.login.class | 实现Login接口的类的全称。
对于broker来说,login config必须以监听器前缀和SASL机制名称为前缀,并使用小写。 例如,listener.name.sasl_ssl.scram-sha-256.sasl.login.class=com.example.CustomScramLogin。 |
class | null | 中 | |
kafka >= 2.1.0 | |||||
client.dns.lookup | 控制客户端如何使用DNS查询。如果设置为 use_all_dns_ips,则依次连接到每个返回的IP地址,直到成功建立连接。断开连接后,使用下一个IP。
一旦所有的IP都被使用过一次,客户端就会再次从主机名中解析IP(s)(然而,JVM和操作系统都会缓存DNS名称查询)。如果设置为 resolve_canonical_bootstrap_servers_only,则将每个引导地址解析成一个canonical名称列表。在bootstrap阶段之后,这和use_all_dns_ips的行为是一样的。如果设置为 default(已弃用),则尝试连接到查找返回的第一个IP地址,即使查找返回多个IP地址。 |
string | use_all_dns_ips | [default, use_all_dns_ips, resolve_canonical_bootstrap_servers_only] | 中 |
delivery.timeout.ms | 调用send()返回后报告成功或失败的时间上限。
这限制了消息在发送前被延迟的总时间,等待broker确认的时间(如果期望的话),以及允许重试发送失败的时间。如果遇到不可恢复的错误,重试次数已经用尽,或者消息被添加到一个达到较早发送到期期限的批次中,生产者可能会报告未能在这个配置之前发送记录。 这个配置的值应该大于或等于request.timeout.ms和linger.ms之和。 |
int | 120000 (2 minutes) | [0,...] | 中 |
kafka >= 2.7 | |||||
ssl.truststore.certificates | 可信证书的格式由'ssl.truststore.type'指定。
默认的SSL引擎工厂只支持带X.509证书的PEM格式。 |
password | null | 高 | |
socket.connection.setup.timeout.max.ms | 客户端等待建立socket连接的最大时间。连接设置超时时间将随着每一次连续的连接失败而成倍增加,直到这个最大值。
为了避免连接风暴,超时时间将被应用一个0.2的随机因子,导致计算值在20%以下和20%以上的随机范围。 |
long | 127000 (127 seconds) | 中 | |
socket.connection.setup.timeout.ms | 客户端等待建立socket连接的时间。
如果在超时之前没有建立连接,客户端将关闭socket通道。 |
long | 10000 (10 seconds) | 中 |
Consumer
在0.9.0.0中,我们引入了新的Java消费者来替代早期基于Scala的简单和高级消费者。新老客户端的配置如下。
新消费者配置:
名称 | 描述 | 类型 | 默认 | 有效值 | 重要程度 |
---|---|---|---|---|---|
bootstrap.servers | host/port,用于和kafka集群建立初始化连接。
因为这些服务器地址仅用于初始化连接,并通过现有配置的来发现全部的kafka集群成员(集群随时会变化),所以此列表不需要包含完整的集群地址(但尽量多配置几个,以防止配置的服务器宕机)。 |
list | 高 | ||
key.deserializer | key的解析序列化接口实现类(Deserializer)。 | class | 高 | ||
value.deserializer | value的解析序列化接口实现类(Deserializer) | class | 高 | ||
fetch.min.bytes | 服务器哦拉取请求返回的最小数据量,如果数据不足,请求将等待数据积累。
默认设置为1字节,表示只要单个字节的数据可用或者读取等待请求超时,就会应答读取请求。 将此值设置的越大将导致服务器等待数据累积的越长,这可能以一些额外延迟为代价提高服务器吞吐量。 |
int | 1 | [0,...] | 高 |
group.id | 此消费者所属消费者组的唯一标识。如果消费者用于订阅或offset管理策略的组管理功能,则此属性是必须的。 | string | "" | 高 | |
heartbeat.interval.ms | 当使用Kafka的分组管理功能时,心跳到消费者协调器之间的预计时间。心跳用于确保消费者的会话保持活动状态,并当有新消费者加入或离开组时方便重新平衡。
该值必须必比session.timeout.ms小,通常不高于1/3。它可以调整的更低,以控制正常重新平衡的预期时间。 |
int | 3000 | 高 | |
max.partition.fetch.bytes | 服务器将返回每个分区的最大数据量。
如果拉取的第一个非空分区中第一个消息大于此限制,则仍然会返回消息,以确保消费者可以正常的工作。 broker接受的最大消息大小通过message.max.bytes(broker config)或max.message.bytes (topic config)定义。 参阅fetch.max.bytes以限制消费者请求大小。 |
int | 1048576 | [0,...] | 高 |
session.timeout.ms | 用于发现消费者故障的超时时间。
消费者周期性的发送心跳到broker,表示其还活着。如果会话超时期满之前没有收到心跳,那么broker将从分组中移除消费者,并启动重新平衡。 请注意,该值必须在broker配置的group.min.session.timeout.ms和group.max.session.timeout.ms允许的范围内。 |
int | 10000 | 高 | |
ssl.key.password | 密钥存储文件中的私钥的密码。客户端可选 | password | null | 高 | |
ssl.keystore.location | 密钥存储文件的位置, 这对于客户端是可选的,并且可以用于客户端的双向认证。 | string | null | 高 | |
ssl.keystore.password | 密钥仓库文件的仓库密码。
客户端可选,只有ssl.keystore.location配置了才需要。 |
password | null | 高 | |
ssl.truststore.location | 信任仓库文件的位置 | string | null | 高 | |
ssl.truststore.password | 信任仓库文件的密码 | password | null | 高 | |
auto.offset.reset | 当Kafka中没有初始offset或如果当前的offset不存在时(例如,该数据被删除了),该怎么办。
|
string | latest | [latest, earliest, none] | 中 |
connections.max.idle.ms | 指定在多少毫秒之后关闭闲置的连接 | long | 540000 | 中 | |
enable.auto.commit | 如果为true,消费者的offset将在后台周期性的提交 | boolean | true | 中 | |
exclude.internal.topics | 内部topic的记录(如偏移量)是否应向消费者公开。如果设置为true,则从内部topic接受记录的唯一方法是订阅它。 | boolean | true | 中 | |
fetch.max.bytes | 服务器为拉取请求返回的最大数据值。
这不是绝对的最大值,如果在第一次非空分区拉取的第一条消息大于该值,该消息将仍然返回,以确保消费者继续工作。 接收的最大消息大小通过message.max.bytes (broker config) 或 max.message.bytes (topic config)定义。 注意,消费者是并行执行多个提取的。 |
int | 52428800 | [0,...] | 中 |
max.poll.interval.ms | 使用消费者组管理时poll()调用之间的最大延迟。消费者在获取更多记录之前可以空闲的时间量的上限。
如果此超时时间期满之前poll()没有调用,则消费者被视为失败,并且分组将重新平衡,以便将分区重新分配给别的成员。 |
int | 300000 | [1,...] | 中 |
max.poll.records | 在单次调用poll()中返回的最大记录数。 | int | 500 | [1,...] | 中 |
partition.assignment.strategy | 当使用组管理时,客户端将使用分区分配策略的类名来分配消费者实例之间的分区所有权 | list | class org.apache.kafka.clients.consumer.RangeAssignor | 中 | |
receive.buffer.bytes | 读取数据时使用的TCP接收缓冲区(SO_RCVBUF)的大小。
如果值为-1,则将使用OS默认值。 |
int | 65536 | [-1,...] | 中 |
request.timeout.ms | 配置控制客户端等待请求响应的最长时间。
如果在超时之前未收到响应,客户端将在必要时重新发送请求,如果重试耗尽则客户端将重新发送请求。 |
int | 305000 | [0,...] | 中 |
sasl.jaas.config | JAAS配置文件中SASL连接登录上下文参数。
这里描述JAAS配置文件格式。该值的格式为: '(=)*;' |
password | null | 中 | |
sasl.kerberos.service.name | Kafka运行Kerberos principal名。可以在Kafka的JAAS配置文件或在Kafka的配置文件中定义。 | string | null | 中 | |
sasl.mechanism | 用于客户端连接的SASL机制。安全提供者可用的机制。GSSAPI是默认机制。 | string | GSSAPI | 中 | |
security.protocol | 用于与broker通讯的协议。
有效值为:PLAINTEXT,SSL,SASL_PLAINTEXT,SASL_SSL。 |
string | PLAINTEXT | 中 | |
send.buffer.bytes | 发送数据时要使用的TCP发送缓冲区(SO_SNDBUF)的大小。
如果值为-1,则将使用OS默认值。 |
int | 131072 | [-1,...] | 中 |
ssl.enabled.protocols | 启用SSL连接的协议列表。 | list | TLSv1.2,TLSv1.1,TLSv1 | 中 | |
ssl.keystore.type | key仓库文件的文件格式,客户端可选。 | string | JKS | 中 | |
ssl.protocol | 用于生成SSLContext的SSL协议。
默认设置是TLS,这对大多数情况都是适用的。 最新的JVM中的允许值为TLS,TLSv1.1和TLSv1.2。 较旧的JVM可能支持SSL,SSLv2和SSLv3,但由于已知的安全漏洞,不建议使用SSL。 |
string | TLS | 中 | |
ssl.provider | 用于SSL连接的安全提供程序的名称。
默认值是JVM的默认安全提供程序。 |
string | null | 中 | |
ssl.truststore.type | 信任存储文件的文件格式。 | string | JKS | 中 | |
auto.commit.interval.ms | 如果enable.auto.commit设置为true,则消费者偏移量自动提交给Kafka的频率(以毫秒为单位)。 | int | 5000 | [0,...] | 低 |
check.crcs | 自动检查CRC32记录的消耗。
这样可以确保消息发生时不会在线或磁盘损坏。 此检查增加了一些开销,因此在寻求极致性能的情况下可能会被禁用。 |
boolean | true | 低 | |
client.id | 在发出请求时传递给服务器的id字符串。
这样做的目的是通过允许将逻辑应用程序名称包含在服务器端请求日志记录中,来跟踪ip/port的请求源。 |
string | "" | 低 | |
fetch.max.wait.ms | 如果没有足够的数据满足fetch.min.bytes,服务器将在接收到提取请求之前阻止的最大时间。 | int | 500 | [0,...] | 低 |
interceptor.classes | 用作拦截器的类的列表。
你可实现ConsumerInterceptor接口以允许拦截(也可能变化)消费者接收的记录。 默认情况下,没有拦截器。 |
list | null | 低 | |
metadata.max.age.ms | 在一定时间段之后(以毫秒为单位的),强制更新元数据,即使没有任何分区领导变化,任何新的broker或分区。 | long | 300000 | [0,...] | 低 |
metric.reporters | 用作度量记录员类的列表。
实现MetricReporter接口以允许插入通知新的度量创建的类。 JmxReporter始终包含在注册JMX统计信息中。 |
list | "" | 低 | |
metrics.num.samples | 保持的样本数以计算度量。 | int | 2 | [1,...] | 低 |
metrics.recording.level | 最高的记录级别。 | string | INFO | [INFO, DEBUG] | 低 |
metrics.sample.window.ms | The window of time a metrics sample is computed over. | long | 30000 | [0,...] | 低 |
reconnect.backoff.ms | 尝试重新连接指定主机之前等待的时间,避免频繁的连接主机,这种机制适用于消费者向broker发送的所有请求。 | long | 50 | [0,...] | 低 |
retry.backoff.ms | 尝试重新发送失败的请求到指定topic分区之前的等待时间。
避免在某些故障情况下,频繁的重复发送。 |
long | 100 | [0,...] | 低 |
sasl.kerberos.kinit.cmd Kerberos | kinit命令路径。 | string | /usr/bin/kinit | 低 | |
sasl.kerberos.min.time.before.relogin | 尝试/恢复之间的登录线程的休眠时间。 | long | 60000 | 低 | |
sasl.kerberos.ticket.renew.jitter | 添加到更新时间的随机抖动百分比。 | double | 0.05 | 低 | |
sasl.kerberos.ticket.renew.window.factor | 登录线程将休眠,直到从上次刷新到ticket的指定的时间窗口因子到期,此时将尝试续订ticket。 | double | 0.8 | 低 | |
ssl.cipher.suites | 密码套件列表,用于TLS或SSL网络协议的安全设置,认证,加密,MAC和密钥交换算法的明明组合。
默认情况下,支持所有可用的密码套件。 |
list | null | 低 | |
ssl.endpoint.identification.algorithm | 使用服务器证书验证服务器主机名的端点识别算法。 | string | null | 低 | |
ssl.keymanager.algorithm | 密钥管理器工厂用于SSL连接的算法。
默认值是为Java虚拟机配置的密钥管理器工厂算法。 |
string | SunX509 | 低 | |
ssl.secure.random.implementation | 用于SSL加密操作的SecureRandom PRNG实现。 | string | null | 低 | |
ssl.trustmanager.algorithm | 信任管理器工厂用于SSL连接的算法。
默认值是为Java虚拟机配置的信任管理器工厂算法。 |
string | PKIX | 低 | |
kafka >= 2.0.0 | |||||
sasl.client.callback.handler.class | 实现AuthenticateCallbackHandler接口的SASL客户端回调处理程序类的全称。 | class | null | 中 | |
sasl.login.callback.handler.class | 实现AuthenticateCallbackHandler接口的SASL登录回调处理程序类的全称。
对于broker来说,登录回调处理程序配置必须以监听器前缀和小写的SASL机制名称为前缀。 例如,listener.name.sasl_ssl.scram-sha-256.sasl.login.callback.handler.class=com.example.CustomScramLoginCallbackHandler |
class | null | 中 | |
sasl.login.class | 实现Login接口的类的全称。
对于broker来说,login config必须以监听器前缀和SASL机制名称为前缀,并使用小写。 例如,listener.name.sasl_ssl.scram-sha-256.sasl.login.class=com.example.CustomScramLogin。 |
class | null | 中 | |
kafka >= 2.1.0 | |||||
client.dns.lookup | 控制客户端如何使用DNS查询。
|
string | use_all_dns_ips | [default, use_all_dns_ips, resolve_canonical_bootstrap_servers_only] | 中 |
kafka >= 2.7 | |||||
ssl.truststore.certificates | 可信证书的格式由'ssl.truststore.type'指定。
默认的SSL引擎工厂只支持带X.509证书的PEM格式。 |
password | null | 高 | |
socket.connection.setup.timeout.max.ms | 客户端等待建立socket连接的最大时间。
连接设置超时时间将随着每一次连续的连接失败而成倍增加,直到这个最大值。 为了避免连接风暴,超时时间将被应用一个0.2的随机因子,导致计算值在20%以下和20%以上的随机范围。 |
long | 127000 (127 seconds) | 中 | |
socket.connection.setup.timeout.ms | 客户端等待建立socket连接的时间。
如果在超时之前没有建立连接,客户端将关闭socket通道。 |
long | 10000 (10 seconds) | 中 |
旧消费者配置:
- group.id
- zookeeper.connect
名称 | 默认值 | 描述 |
---|---|---|
group.id | 标识消费者所属消费者组(独一的)。通过设置相同的组ID,多个消费者表明属于该消费者组的一部分。 | |
zookeeper.connect | 指定ZooKeeper连接字符串,格式为hostname:port,其中host和port是ZooKeeper服务器的主机和端口。 为了使ZooKeeper宕机时连接到其他ZooKeeper节点,你还可以以hostname1:host1,hostname2:port2,hostname3:port3的形式指定多个主机。
还可以设置ZooKeeper chroot路径,作为其ZooKeeper连接字符串的一部分,将其数据放置在全局ZooKeeper命名空间中的某个路径下。 如果是这样,消费者应该在其连接字符串中使用相同的chroot路径。 例如,要给出/chroot/path的chroot路径,你需要将该值设置为:hostname1:port1,hostname2:port2,hostname3:port3/chroot/path。 | |
consumer.id | null | 如果未设置将自动生成。 |
socket.timeout.ms | 30 * 1000 | 网络请求socker的超时时间。实际的超时是 max.fetch.wait+socket.timeout.ms的时间。 |
socket.receive.buffer.bytes | 64 * 1024 | 网络请求socker的接收缓存大小 |
fetch.message.max.bytes | 1024 * 1024 | 每个拉取请求的每个topic分区尝试获取的消息的字节大小。这些字节将被读入每个分区的内存,因此这有助于控制消费者使用的内存。 拉取请求的大小至少与服务器允许的最大消息的大小一样大,否则生产者可能发送大于消费者可以拉取的消息。 |
num.consumer.fetchers | 1 | 用于拉取数据的拉取线程数。 |
auto.commit.enable | true | 如果为true,请定期向ZooKeeper提交消费者已经获取的消息的偏移量。 当进程失败时,将使用这种承诺偏移量作为新消费者开始的位置。 |
auto.commit.interval.ms | 60 * 1000 | 消费者offset提交到zookeeper的频率(以毫秒为单位) |
queued.max.message.chunks | 2 | 消费缓存消息块的最大大小。每个块可以达到fetch.message.max.bytes。 |
rebalance.max.retries | 4 | 当新的消费者加入消费者组时,消费者集合尝试“重新平衡”负载,并为每个消费者分配分区。如果消费者集合在分配时发生时发生变化,则重新平衡将失败并重试。此设置控制尝试之前的最大尝试次数。 |
fetch.min.bytes | 1 | 拉取请求返回最小的数据量。如果没有足够的数据,请求将等待数据积累,然后应答请求。 |
fetch.wait.max.ms | 100 | 如果没有足够的数据(fetch.min.bytes),服务器将在返回请求数据之前阻塞的最长时间。 |
rebalance.backoff.ms | 2000 | 重新平衡时重试之间的回退时间。如果未设置,则使用zookeeper.sync.time.ms中的值。 |
refresh.leader.backoff.ms | 200 | 回退时间等待,然后再尝试选举一个刚刚失去leader的分区。 |
auto.offset.reset | largest | 如果ZooKeeper中没有初始偏移量,或偏移值超出范围,该怎么办?
|
consumer.timeout.ms | -1 | 如果在指定的时间间隔后没有消息可用,则向用户发出超时异常 |
exclude.internal.topics | true | 来自内部topic的消息(如偏移量)是否应该暴露给消费者。 |
client.id | group id value | 客户端ID是每个请求中发送的用户指定的字符串,用于帮助跟踪调用。 它应该逻辑地标识发出请求的应用程序。 |
zookeeper.session.timeout.ms | 6000 | ZooKeeper会话超时。如果消费者在这段时间内没有对ZooKeeper心跳,那么它被认为是死亡的,并且会发生重新平衡。 |
zookeeper.connection.timeout.ms | 6000 | 与zookeeper建立连接时客户端等待的最长时间。 |
zookeeper.sync.time.ms | 2000 | ZK follower可以罗ZK leader多久 |
offsets.storage | zookeeper | 选择存储偏移量的位置(zookeeper或kafka)。 |
offsets.channel.backoff.ms | 1000 | 重新连接offset通道或重试失败的偏移提取/提交请求时的回退周期。 |
offsets.channel.socket.timeout.ms | 10000 | 读取offset拉取/提交响应的Socker的超时时间。此超时也用于查询offset manager的ConsumerMetadata请求。 |
offsets.commit.max.retries | 5 | 失败时重试偏移提交的最大次数。
|
dual.commit.enabled | true | 如果使用“kafka”作为offsets.storage,则可以向ZooKeeper(除Kafka之外)进行双重提交offset。
在从基于zookeeper的offset存储迁移到kafka存储的时候可以这么做。 对于任何给定的消费者组,在该组中的所有实例已迁移到提交到broker(而不是直接到ZooKeeper)的新的版本之后,可以关闭这个。 |
partition.assignment.strategy | range | 在“range”或“roundrobin”策略之间选择将分区分配给消费者流。
循环分区分配器分配所有可用的分区和所有可用的消费者线程。然后,继续从分区到消费者线程进行循环任务。如果所有消费者实例的订阅是相同的,则分区将被均匀分布。(即,分区所有权计数将在所有消费者线程之间的差异仅在一个delta之内。)
范围(Range)分区基于每个topic。对于每个主题,我们按数字顺序排列可用的分区,并以字典顺序排列消费者线程。然后,我们将分区数除以消费者流(线程)的总数来确定分配给每个消费者的分区数。 如果不均匀分割,那么前几个消费者将会有多的分区。 |
Streams
Kafka Stream客户端库配置:
名称 | 描述 | 类型 | 默认 | 有效值 | 重要程度 |
---|---|---|---|---|---|
application.id | 流处理应用程序标识。必须在Kafka集群中是独一无二的。
1)默认客户端ID前缀,2)成员资格管理的group-id,3)changgelog的topic前缀 |
string | 高 | ||
bootstrap.servers | 用于建立与Kafka集群的初始连接的主机/端口列表。
客户端将会连接所有服务器,跟指定哪些服务器无关 - 通过指定的服务器列表会自动发现全部的服务器。 此列表格式host1:port1,host2:port2,...由于这些服务器仅用于初始连接以发现完整的集群成员(可能会动态更改),所以此列表不需要包含完整集 的服务器(您可能需要多个服务器,以防指定的服务器关闭)。 |
list | 高 | ||
replication.factor | 流处理程序创建更改日志topic和重新分配topic的副本数 | int | 1 | 高 | |
state.dir | 状态存储的目录地址。 | string | /tmp/kafka-streams | 高 | |
cache.max.bytes.buffering | 用于缓冲所有线程的最大内存字节数 | long | 10485760 | [0,...] | 低 |
client.id | 发出请求时传递给服务器的id字符串。
这样做的目的是通过允许将逻辑应用程序名称包含在服务器端请求日志记录中,来追踪请求源的ip/port。 |
string | "" | 高 | |
default.key.serde | 用于实现Serde接口的key的默认序列化器/解串器类。 | class | org.apache.kafka.common.serialization.Serdes$ByteArraySerde | 中 | |
default.timestamp.extractor | 实现TimestampExtractor接口的默认时间戳提取器类。 | class | org.apache.kafka.streams.processor.FailOnInvalidTimestamp | 中 | |
default.value.serde | 用于实现Serde接口的值的默认serializer / deserializer类。 | class | org.apache.kafka.common.serialization.Serdes$ByteArraySerde | 中 | |
num.standby.replicas | 每个任务的备用副本数。 | int | 0 | 低 | |
num.stream.threads | 执行流处理的线程数。 | int | 1 | 低 | |
processing.guarantee | 应使用的加工保证。
可能的值为at_least_once(默认)和exact_once。 |
string | at_least_once | [at_least_once, exactly_once] | 中 |
security.protocol | 用于与broker沟通的协议。
有效值为:PLAINTEXT,SSL,SASL_PLAINTEXT,SASL_SSL。 |
string | PLAINTEXT | 中 | |
application.server | host:port指向用户嵌入定义的末端,可用于发现单个KafkaStreams应用程序中状态存储的位置 | string | "" | 低 | |
buffered.records.per.partition | 每个分区缓存的最大记录数。 | int | 1000 | 低 | |
commit.interval.ms | 用于保存process位置的频率。
注意,如果'processing.guarantee'设置为'exact_once',默认值为100,否则默认值为30000。 |
long | 30000 | 低 | |
connections.max.idle.ms | 关闭闲置的连接时间(以毫秒为单位)。 | long | 540000 | 中 | |
key.serde | 用于实现Serde接口的key的Serializer/deserializer类.此配置已被弃用,请改用default.key.serde | class | null | 低 | |
metadata.max.age.ms | 即使我们没有看到任何分区leader发生变化,主动发现新的broker或分区,强制更新元数据时间(以毫秒为单位)。 | long | 300000 | [0,...] | 低 |
metric.reporters | metric reporter的类列表。
实现MetricReporter接口,JmxReporter始终包含在注册JMX统计信息中。 |
list | "" | 低 | |
metrics.num.samples | 保持的样本数以计算度量。 | int | 2 | [1,...] | 低 |
metrics.recording.level | 日志级别。 | string | INFO | [INFO, DEBUG] | 低 |
metrics.sample.window.ms | 时间窗口计算度量标准。 | long | 30000 | [0,...] | 低 |
partition.grouper | 实现PartitionGrouper接口的Partition grouper类。 | class | org.apache.kafka.streams.processor.DefaultPartitionGrouper | 中 | |
poll.ms | 阻塞输入等待的时间(以毫秒为单位)。 | long | 100 | 低 | |
receive.buffer.bytes | 读取数据时使用的TCP接收缓冲区(SO_RCVBUF)的大小。
如果值为-1,则将使用OS默认值。 |
int | 32768 | [0,...] | 中 |
reconnect.backoff.max.ms | 因故障无法重新连接broker,重新连接的等待的最大时间(毫秒)。
如果提供,每个主机会连续增加,直到达到最大值。随机递增20%的随机抖动以避免连接风暴。 |
long | 1000 | [0,...] | 低 |
reconnect.backoff.ms | 尝试重新连接之前等待的时间。避免在高频繁的重复连接服务器。
这种backoff适用于消费者向broker发送的所有请求。 |
long | 50 | [0,...] | 低 |
request.timeout.ms | 控制客户端等待请求响应的最长时间。
如果在配置时间内未收到响应,客户端将在需要时重新发送请求,如果重试耗尽,则请求失败。 |
int | 40000 | [0,...] | 低 |
retry.backoff.ms | 尝试重试失败请求之前等待的时间。以避免了在某些故障情况下,在频繁重复发送请求。 | long | 100 | [0,...] | 低 |
rocksdb.config.setter | 一个Rocks DB配置setter类,或实现RocksDBConfigSetter接口的类名 | null | 低 | ||
send.buffer.bytes | 发送数据时要使用的TCP发送缓冲区(SO_SNDBUF)的大小。
如果值为-1,则将使用OS默认值。 |
int | 131072 | [0,...] | 低 |
state.cleanup.delay.ms | 在分区迁移删除状态之前等待的时间(毫秒)。 | long | 60000 | 低 | |
timestamp.extractor | 实现TimestampExtractor接口的Timestamp抽取器类。
|
class | null | 低 | |
windowstore.changelog.additional.retention.ms | 添加到Windows维护管理器以确保数据不会从日志中过早删除。
默认为1天 |
long | 86400000 | 低 | |
zookeeper.connect | Zookeeper连接字符串,用于Kafka主题管理。
|
string | "" | 低 |
Connect
Kafka Connect框架的相关配置:
名称 | 描述 | 类型 | 默认 | 有效值 | 重要程度 |
---|---|---|---|---|---|
config.storage.topic | kafka topic仓库配置 | string | 高 | ||
group.id | 唯一的字符串,用于标识此worker所属的Connect集群组。 | string | 高 | ||
key.converter | 用于Kafka Connect和写入到Kafka的序列化消息的之间格式转换的转换器类。
这可以控制写入或从kafka读取的消息中的键的格式,并且由于这与连接器无关,因此它允许任何连接器使用任何序列化格式。 常见格式的示例包括JSON和Avro。 |
class | 高 | ||
offset.storage.topic | 连接器的offset存储到哪个topic中 | string | 高 | ||
status.storage.topic | 追踪连接器和任务状态存储到哪个topic中 | string | 高 | ||
value.converter | 用于Kafka Connect格式和写入Kafka的序列化格式之间转换的转换器类。
控制了写入或从Kafka读取的消息中的值的格式,并且由于这与连接器无关,因此它允许任何连接器使用任何序列化格式。 常见格式的示例包括JSON和Avro。 |
class | 高 | ||
internal.key.converter | 用于在Kafka Connect格式和写入Kafka的序列化格式之间转换的转换器类。
这可以控制写入或从Kafka读取的消息中的key的格式,并且由于这与连接器无关,因此它允许任何连接器使用任何序列化格式。 常见格式的示例包括JSON和Avro。 此设置用于控制框架内部使用的记账数据的格式,例如配置和偏移量,因此用户可以使用运行各种Converter实现。 |
class | 低 | ||
internal.value.converter | 用于在Kafka Connect格式和写入Kafka的序列化格式之间转换的转换器类。
这控制了写入或从Kafka读取的消息中的值的格式,并且由于这与连接器无关,因此它允许任何连接器使用任何序列化格式。 常见格式的示例包括JSON和Avro。 此设置用于控制框架内部使用的记账数据的格式,例如配置和偏移量,因此用户可以使用运行各种Converter实现。 |
class | 低 | ||
bootstrap.servers | 用于建立与Kafka集群的初始连接的主机/端口列表。此列表用来发现完整服务器集的初始主机。
该列表的格式应为host1:port1,host2:port2,.... 由于这些服务器仅用于初始连接以发现完整的集群成员资格(可能会动态更改),因此,不需要包含完整的服务器(尽管如此,你需要多配置几个,以防止配置的宕机)。 |
list | localhost:9092 | 高 | |
heartbeat.interval.ms | 心跳间隔时间。心跳用于确保会话保持活动,并在新成员加入或离开组时进行重新平衡。 该值必须设置为低于session.timeout.ms,但通常应设置为不高于该值的1/3。 | int | 3000 | 高 | |
rebalance.timeout.ms | 限制所有组中消费者的任务处理数据和提交offset所需的时间。
如果超时,那么woker将从组中删除,这也将导致offset提交失败。 |
int | 60000 | 高 | |
session.timeout.ms | 用于察觉worker故障的超时时间。
worker定时发送心跳以表明自己是活着的。如果broker在会话超时时间到期之前没有接收到心跳,那么broker将从分组中移除该worker,并启动重新平衡。 注意,该值必须在group.min.session.timeout.ms和group.max.session.timeout.ms范围内。 |
int | 10000 | 高 | |
ssl.key.password | 密钥存储文件中私钥的密码。
这对于客户端是可选的。 |
password | null | 高 | |
ssl.keystore.location | 密钥存储文件的位置。
这对于客户端是可选的,可以用于客户端的双向身份验证。 |
string | null | 高 | |
ssl.keystore.password | 密钥存储文件的存储密码。
客户端是可选的,只有配置了ssl.keystore.location才需要。 |
password | null | 高 | |
ssl.truststore.location | 信任存储文件的位置。 | string | null | 高 | |
ssl.truststore.password | 信任存储文件的密码。 | password | null | 高 | |
connections.max.idle.ms | 多少毫秒之后关闭空闲的连接。 | long | 540000 | 中 | |
receive.buffer.bytes | 读取数据时使用的TCP接收缓冲区(SO_RCVBUF)的大小。
如果值为-1,则将使用OS默认值。 |
int | 32768 | [0,...] | 中 |
request.timeout.ms | 配置控制客户端等待请求响应的最长时间。
如果在超时之前未收到响应,客户端将在必要时重新发送请求,如果重试耗尽,则该请求将失败。 |
int | 40000 | [0,...] | 中 |
sasl.jaas.config | 用于JAAS配置文件的SASL连接的JAAS登录上下文参数格式。
这里描述了JAAS配置文件的格式。该值的格式为:' (=)*;' |
password | null | 中 | |
sasl.kerberos.service.name | Kafka运行的Kerberos principal名称。
可以在Kafka的JAAS配置或Kafka的配置中定义。 |
string | null | 中 | |
sasl.mechanism | 用户客户端连接的SASL机制。可以提供者任何安全机制。
GSSAPI是默认机制。 |
string | GSSAPI | 中 | |
security.protocol | 用于和broker通讯的策略。
有效的值有:PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL。 |
string | PLAINTEXT | 中 | |
send.buffer.bytes | 发送数据时使用TCP发送缓冲区(SO_SNDBUF)的大小。
如果值为-1,则将使用OS默认。 |
int | 131072 | [-1,...] | 中 |
ssl.enabled.protocols | 启用SSL连接的协议列表。 | list | TLSv1.2,TLSv1.1,TLSv1 | 中 | |
ssl.keystore.type | 密钥存储文件的文件格式。
对于客户端是可选的。 |
string | JKS | 中 | |
ssl.protocol | 用于生成SSLContext的SSL协议。
默认设置是TLS,这对大多数情况都是适用的。 最新的JVM中的允许值为TLS,TLSv1.1和TLSv1.2。 旧的JVM可能支持SSL,SSLv2和SSLv3,但由于已知的安全漏洞,不建议使用SSL。 |
string | TLS | 中 | |
ssl.provider | 用于SSL连接的安全提供程序的名称。
默认值是JVM的默认安全提供程序。 |
string | null | 中 | |
ssl.truststore.type | 信任存储文件的文件格式。 | string | JKS | 中 | |
worker.sync.timeout.ms | 当worker与其他worker不同步并需要重新同步配置时,需等待一段时间才能离开组,然后才能重新加入。 | int | 3000 | 中 | |
worker.unsync.backoff.ms | 当worker与其他worker不同步,并且无法在worker.sync.timeout.ms 期间追赶上,在重新连接之前,退出Connect集群的时间。 | int | 300000 | 中 | |
access.control.allow.methods | 通过设置Access-Control-Allow-Methods标头来设置跨源请求支持的方法。
Access-Control-Allow-Methods标头的默认值允许GET,POST和HEAD的跨源请求。 |
string | "" | 低 | |
access.control.allow.origin | 将Access-Control-Allow-Origin标头设置为REST API请求。
要启用跨源访问,请将其设置为应该允许访问API的应用程序的域,或者 *" 以允许从任何的域。 默认值只允许从REST API的域访问。 |
string | "" | 低 | |
client.id | 在发出请求时传递给服务器的id字符串。
这样做的目的是通过允许逻辑应用程序名称包含在请求消息中,来跟踪请求来源。而不仅仅是ip/port |
string | "" | 低 | |
config.storage.replication.factor | 当创建配置仓库topic时的副本数 | short | 3 | [1,...] | 低 |
metadata.max.age.ms | 在没有任何分区leader改变,主动地发现新的broker或分区的时间。 | long | 300000 | [0,...] | 低 |
metric.reporters | A list of classes to use as metrics reporters.
Implementing the MetricReporter interface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics. |
list | "" | 低 | |
metrics.num.samples | 保留计算metrics的样本数(译者不清楚是做什么的) | int | 2 | [1,...] | 低 |
metrics.sample.window.ms | The window of time a metrics sample is computed over. | long | 30000 | [0,...] | 低 |
offset.flush.interval.ms | 尝试提交任务偏移量的间隔。 | long | 60000 | 低 | |
offset.flush.timeout.ms | 在取消进程并恢复要在之后尝试提交的offset数据之前,等待消息刷新并分配要提交到offset仓库的offset数据的最大毫秒数。 | long | 5000 | 低 | |
offset.storage.partitions | 创建offset仓库topic的分区数 | int | 25 | [1,...] | 低 |
offset.storage.replication.factor | 创建offset仓库topic的副本数 | short | 3 | [1,...] | 低 |
plugin.path | 包含插件(连接器,转换器,转换)逗号(,)分隔的路径列表。
该列表应包含顶级目录,其中包括以下任何组合:a)包含jars与插件及其依赖关系的目录 b)具有插件及其依赖项的uber-jars c)包含插件类的包目录结构的目录及其依赖关系, 注意:将遵循符号链接来发现依赖关系或插件。 示例:plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors |
list | null | 低 | |
reconnect.backoff.max.ms | 无法连接broker时等待的最大时间(毫秒)。
如果设置,则每个host的将会持续的增加,直到达到最大值。 计算增加后,再增加20%的随机抖动,以避免高频的反复连接。 |
long | 1000 | [0,...] | 低 |
reconnect.backoff.ms | 尝试重新连接到主机之前等待的时间。
避免了高频率反复的连接主机。 这种机制适用于消费者向broker发送的所有请求。 |
long | 50 | [0,...] | 低 |
rest.advertised.host.name | 如果设置,其他wokers将通过这个hostname进行连接。 | string | null | 低 | |
rest.advertised.port | 如果设置,其他的worker将通过这个端口进行连接。 | int | null | 低 | |
rest.host.name | REST API的主机名。
如果设置,它将只绑定到这个接口。 |
string | null | 低 | |
rest.port | 用于监听REST API的端口 | int | 8083 | 低 | |
retry.backoff.ms | 失败请求重新尝试之前的等待时间,避免了在某些故障的情况下,频繁的重复发送请求。 | long | 100 | [0,...] | 低 |
sasl.kerberos.kinit.cmd | Kerberos kinit命令路径. | string | /usr/bin/kinit | 低 | |
sasl.kerberos.min.time.before.relogin | 尝试refresh之间登录线程的休眠时间. | long | 60000 | 低 | |
sasl.kerberos.ticket.renew.jitter | 添加到更新时间的随机抖动百分比。 | double | 0.05 | 低 | |
sasl.kerberos.ticket.renew.window.factor | 登录线程将休眠,直到从上次刷新ticket到期,此时将尝试续订ticket。 | double | 0.8 | 低 | |
ssl.cipher.suites | 密码套件列表。用于TLS或SSL网络协议协商网络连接的安全设置的认证,加密,MAC和密钥交换算法的命名组合。
默认情况下,支持所有可用的密码套件。 |
list | null | 低 | |
ssl.endpoint.identification.algorithm | 末端识别算法使用服务器证书验证服务器主机名。 | string | null | 低 | |
ssl.keymanager.algorithm | 用于SSL连接的key管理工厂的算法,默认值是Java虚拟机配置的密钥管理工厂算法。 | string | SunX509 | 低 | |
ssl.secure.random.implementation | 用于SSL加密操作的SecureRandom PRNG实现。 | string | null | 低 | |
ssl.trustmanager.algorithm | 用于SSL连接的信任管理仓库算法。
默认值是Java虚拟机配置的信任管理器工厂算法。 |
string | PKIX | 低 | |
status.storage.partitions | 用于创建状态仓库topic的分区数 | int | 5 | [1,...] | 低 |
status.storage.replication.factor | 用于创建状态仓库topic的副本数 | short | 3 | [1,...] | 低 |
task.shutdown.graceful.timeout.ms | 等待任务正常关闭的时间,这是总时间,不是每个任务,所有任务触发关闭,然后依次等待。 | long | 5000 | 低 | |
kafka >= 2.0.0 | |||||
sasl.client.callback.handler.class | 实现AuthenticateCallbackHandler接口的SASL客户端回调处理程序类的全称。 | class | null | 中 | |
sasl.login.callback.handler.class | 实现AuthenticateCallbackHandler接口的SASL登录回调处理程序类的全称。
对于broker来说,登录回调处理程序配置必须以监听器前缀和小写的SASL机制名称为前缀。 例如,listener.name.sasl_ssl.scram-sha-256.sasl.login.callback.handler.class=com.example.CustomScramLoginCallbackHandler |
class | null | 中 | |
sasl.login.class | 实现Login接口的类的全称。对于broker来说,login config必须以监听器前缀和SASL机制名称为前缀,并使用小写。
例如,listener.name.sasl_ssl.scram-sha-256.sasl.login.class=com.example.CustomScramLogin。 |
class | null | 中 | |
kafka >= 2.7 | |||||
ssl.truststore.certificates | 可信证书的格式由'ssl.truststore.type'指定。
默认的SSL引擎工厂只支持带X.509证书的PEM格式。 |
password | null | 高 | |
socket.connection.setup.timeout.max.ms | 客户端等待建立socket连接的最大时间。
连接设置超时时间将随着每一次连续的连接失败而成倍增加,直到这个最大值。 为了避免连接风暴,超时时间将被应用一个0.2的随机因子,导致计算值在20%以下和20%以上的随机范围。 |
long | 127000 (127 seconds) | 中 | |
socket.connection.setup.timeout.ms | 客户端等待建立socket连接的时间。
如果在超时之前没有建立连接,客户端将关闭socket通道。 |
long | 10000 (10 seconds) | 中 |
AdminClient
Kafka Admin客户端的配置:
名称 | 描述 | 类型 | 默认 | 有效值 | 重要程度 |
---|---|---|---|---|---|
bootstrap.servers | host/port,用于和kafka集群建立初始化连接。
因为这些服务器地址仅用于初始化连接,并通过现有配置的来发现全部的kafka集群成员(集群随时会变化),所以此列表不需要包含完整的集群地址(但尽量多配置几个,以防止配置的服务器宕机)。 |
list | 高 | ||
ssl.key.password | 密钥仓库文件中的私钥密码。
对于客户端是可选的。 |
password | null | 高 | |
ssl.keystore.location | 密钥仓库文件的位置。
这对于客户端是可选的,可以用于客户端的双向认证。 |
string | null | 高 | |
ssl.keystore.password | 密钥仓库文件的仓库密钥。
这对于客户端是可选的,只有配置了ssl.keystore.location才需要。 |
password | null | 高 | |
ssl.truststore.location | 信任存储文件的位置。 | string | null | 高 | |
ssl.truststore.password | 信任存储文件的密码。
如果未设置密码,对信任库的访问仍然可用,但是完整性检查将被禁用。 |
password | null | 高 | |
client.id | 在发出请求时传递给服务器的id字符串。
这样做的目的是通过允许在服务器端请求日志记录中包含逻辑应用程序名称来跟踪请求源的ip/port。 |
string | "" | 中 | |
connections.max.idle.ms | 关闭闲置连接的时间。 | long | 300000 | 中 | |
receive.buffer.bytes | 读取数据时使用的TCP接收缓冲区(SO_RCVBUF)的大小。
如果值为-1,则将使用OS默认值。 |
int | 65536 | [-1,...] | 中 |
request.timeout.ms | 配置控制客户端等待请求响应的最长时间。
如果在超时之前未收到响应,客户端将在必要时重新发送请求,如果重试耗尽,则该请求将失败。 |
int | 120000 | [0,...] | 中 |
sasl.jaas.config | JAAS配置文件使用的格式的SASL连接的JAAS登录上下文参数。这里描述JAAS配置文件格式。该值的格式为:' (=)*;' | password | null | 中 | |
sasl.kerberos.service.name | Kafka运行的Kerberos principal名。
可以在Kafka的JAAS配置或Kafka的配置中定义。 |
string | null | 中 | |
sasl.mechanism | 用于客户端连接的SASL机制。
安全提供者可用的任何机制。GSSAPI是默认机制。 |
string | GSSAPI | 中 | |
security.protocol | 与broker通讯的协议。
有效的值有: PLAINTEXT, SSL, SASL_PLAINTEXT,SASL_SSL. |
string | PLAINTEXT | 中 | |
send.buffer.bytes | 发送数据时时使用TCP发送缓冲区(SO_SNDBUF)的大小。
如果值为-1,则使用OS默认值。 |
int | 131072 | [-1,...] | 中 |
ssl.enabled.protocols | 启用SSL连接的协议列表。 | list | TLSv1.2,TLSv1.1,TLSv1 | 中 | |
ssl.keystore.type | 密钥仓库文件的文件格式。
对于客户端是可选的。 |
string | JKS | 中 | |
ssl.protocol | 用于生成SSLContext的SSL协议。
默认设置是TLS,这对大多数情况都是适用的。 最新的JVM中允许的值是TLS,TLSv1.1和TLSv1.2。 较旧的JVM可能支持SSL,SSLv2和SSLv3,但由于已知的安全漏洞,不建议使用。 |
string | TLS | 中 | |
ssl.provider | 用于SSL连接的安全提供程序的名称。
默认值是JVM的默认安全提供程序。 |
string | null | 中 | |
ssl.truststore.type | 信任仓库文件的文件格式 | string | JKS | 中 | |
metadata.max.age.ms | 我们强制更新元数据的时间段(以毫秒为单位),即使我们没有任何分区leader发生变化,主动发现任何新的broker或分区。 | long | 300000 | [0,...] | 低 |
metric.reporters | 用作指标记录的类的列表。
实现MetricReporter接口,以允许插入将被通知新的度量创建的类。 JmxReporter始终包含在注册JMX统计信息中。 |
list | "" | 低 | |
metrics.num.samples | 用于计算度量维护的样例数。 | int | 2 | [1,...] | 低 |
metrics.recording.level | The highest recording level for metrics. | string | INFO | [INFO, DEBUG] | 低 |
metrics.sample.window.ms | 时间窗口计算度量标准。 | long | 30000 | [0,...] | 低 |
reconnect.backoff.max.ms | 重新连接到重复无法连接的broker程序时等待的最大时间(毫秒)。
如果提供,每个主机的回退将会连续增加,直到达到最大值。 计算后退增加后,增加20%的随机抖动以避免连接风暴。 |
long | 1000 | [0,...] | 低 |
reconnect.backoff.ms | 尝试重新连接到给定主机之前等待的基本时间量。
这避免了在频繁的重复连接主机。此配置适用于client对broker的所有连接尝试。 |
long | 50 | [0,...] | 低 |
retries | 在失败之前重试调用的最大次数 | int | 5 | [0,...] | 低 |
retry.backoff.ms | 尝试重试失败的请求之前等待的时间。
这样可以避免在某些故障情况下以频繁的重复发送请求。 |
long | 100 | [0,...] | 低 |
sasl.kerberos.kinit.cmd | Kerberos kinit命令路径。 | string | /usr/bin/kinit | 低 | |
sasl.kerberos.min.time.before.relogin | 刷新尝试之间的登录线程睡眠时间。 | long | 60000 | 低 | |
sasl.kerberos.ticket.renew.jitter | 添加到更新时间的随机抖动百分比。 | double | 0.05 | 低 | |
sasl.kerberos.ticket.renew.window.factor | 登录线程将休眠,直到从上次刷新到“票”到期时间的指定窗口为止,此时将尝试续订“票”。 | double | 0.8 | 低 | |
ssl.cipher.suites | 密码套件列表。
是TLS或SSL网络协议来协商用于网络连接的安全设置的认证,加密,MAC和密钥交换算法的命名组合。 默认情况下,支持所有可用的密码套件。 |
list | null | 低 | |
ssl.endpoint.identification.algorithm | 使用服务器证书验证服务器主机名的端点识别算法。 | string | null | 低 | |
ssl.keymanager.algorithm | 用于SSL连接的密钥管理工厂算法。默认值是Java虚拟机配置的密钥管理器工厂算法。 | string | SunX509 | 低 | |
ssl.secure.random.implementation | 用于SSL加密操作的SecureRandom PRNG实现。 | string | null | 低 | |
ssl.trustmanager.algorithm | 用于SSL连接的信任管理工厂算法,默认是Java虚拟机机制。 | string | PKIX | 低 | |
kafka >= 2.0.0 | |||||
sasl.client.callback.handler.class | 实现AuthenticateCallbackHandler接口的SASL客户端回调处理程序类的全称。 | class | null | 中 | |
sasl.login.callback.handler.class | 实现AuthenticateCallbackHandler接口的SASL登录回调处理程序类的全称。
对于broker来说,登录回调处理程序配置必须以监听器前缀和小写的SASL机制名称为前缀。 例如,listener.name.sasl_ssl.scram-sha-256.sasl.login.callback.handler.class=com.example.CustomScramLoginCallbackHandler |
class | null | 中 | |
sasl.login.class | 实现Login接口的类的全称。
对于broker来说,login config必须以监听器前缀和SASL机制名称为前缀,并使用小写。 例如,listener.name.sasl_ssl.scram-sha-256.sasl.login.class=com.example.CustomScramLogin。 |
class | null | 中 | |
kafka >= 2.1.0 | |||||
client.dns.lookup | 控制客户端如何使用DNS查询。
|
string | use_all_dns_ips | [default, use_all_dns_ips, resolve_canonical_bootstrap_servers_only] | 中 |
kafka >= 2.7 | |||||
ssl.truststore.certificates | 可信证书的格式由'ssl.truststore.type'指定。
默认的SSL引擎工厂只支持带X.509证书的PEM格式。 |
password | null | 高 | |
socket.connection.setup.timeout.max.ms | 客户端等待建立socket连接的最大时间。
连接设置超时时间将随着每一次连续的连接失败而成倍增加,直到这个最大值。 为了避免连接风暴,超时时间将被应用一个0.2的随机因子,导致计算值在20%以下和20%以上的随机范围。 |
long | 127000 (127 seconds) | 中 | |
socket.connection.setup.timeout.ms | 客户端等待建立socket连接的时间。
如果在超时之前没有建立连接,客户端将关闭socket通道。 |
long | 10000 (10 seconds) | 中 |