Kafka:安装、配置

来自Wikioe
跳到导航 跳到搜索


单机安装

Kafka 解压即用,并没有繁琐的安装步骤,唯一注意的是其需要 Zookeeper 支持(但其自带有 Zookeeper)。


  1. 官网下载压缩包:https://www.apache.org/dyn/closer.cgi?path=/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz
  2. 提取 tar 文件:
    $ cd opt/
    $ tar -zxf kafka_2.11.0.9.0.0 tar.gz
    
  3. 启动服务器:
    $ cd kafka_2.11.0.9.0.0
    $ bin/kafka-server-start.sh config/server.properties
    
    服务器启动后,会得到以下响应:
    $ bin/kafka-server-start.sh config/server.properties
    [2016-01-02 15:37:30,410] INFO KafkaConfig values:
    request.timeout.ms = 30000
    log.roll.hours = 168
    inter.broker.protocol.version = 0.9.0.X
    log.preallocate = false
    security.inter.broker.protocol = PLAINTEXT
    …………………………………………….
    …………………………………………….
    
  4. 停止服务器:
    $ bin/kafka-server-stop.sh config/server.properties
    


NOTE:

  • 启动 kafka 需要执行 kafka-server-start.bat 文件,需要传入一个路径参数(server.config 文件的路径):
    1. 如果使用自行安装的 Zookeeper(先启动 Zookeeper,再启动 Kafka),使用“config/server.properties”。
    2. 如果使用 Kafka 自带的 Zookeeper(直接启动 Kafka 即可),使用:“config/zookeeper.properties”。
    properties 文件都位于“../kafka_x.xx.x.x.x.x/config”中。

使用

打开另一个命令终端启动kafka服务:

> bin/kafka-server-start.sh config/server.properties &


  1. 创建一个主题(topic)
    创建一个名为“test”的Topic,只有一个分区和一个备份:
    bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
    
    创建好之后,可以通过运行以下命令,查看已创建的topic信息:
    > bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
    Topic:quickstart-events  PartitionCount:1    ReplicationFactor:1 Configs:
    Topic: quickstart-events Partition: 0    Leader: 0   Replicas: 0 Isr: 0
    
    或者,除了手工创建topic外,你也可以配置你的broker,当发布一个不存在的topic时会自动创建topic
  2. 发送消息
    Kafka提供了一个命令行的工具,可以从输入文件或者命令行中读取消息并发送给Kafka集群。每一行是一条消息。
    运行 producer(生产者),然后在控制台输入几条消息到服务器:
    > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
    This is a message
    This is another message
    
  3. 消费消息
    Kafka也提供了一个消费消息的命令行工具,将存储的信息输出出来,新打开一个命令控制台,输入:
    > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
    This is a message
    This is another message
    
    如果你有2台不同的终端上运行上述命令,那么当你在运行生产者时,消费者就能消费到生产者发送的消息。
  4. 使用 Kafka Connect 来 导入/导出 数据
    你可能在现有的系统中拥有大量的数据,如关系型数据库或传统的消息传递系统,以及许多已经使用这些系统的应用程序。Kafka Connect允许你不断地从外部系统提取数据到Kafka,反之亦然。用Kafka整合现有的系统是非常容易的。为了使这个过程更加容易,有数百个这样的连接器现成可用。
  5. 使用Kafka Stream来处理数据
    一旦你的数据存储在Kafka中,你就可以用Kafka Streams客户端库来处理这些数据,该库适用于Java/Scala。它允许你实现自己的实时应用程序和微服务,其中输入和/或输出数据存储在Kafka主题中。Kafka Streams将在客户端编写和部署标准Java和Scala应用程序的简单性与Kafka服务器端集群技术的优势相结合,使这些应用程序具有可扩展性、弹性、容错性和分布式。该库支持精确的一次性处理、有状态操作和聚合、窗口化、连接、基于事件时间的处理等等。
    实现流行的 WordCount 算法【???】:
    KStream<String, String> textLines = builder.stream("quickstart-events");
    
    KTable<String, Long> wordCounts = textLines
                .flatMapValues(line -> Arrays.asList(line.toLowerCase().split(" ")))
                .groupBy((keyIgnored, word) -> word)
                .count();
    
    wordCounts.toStream().to("output-topic", Produced.with(Serdes.String(), Serdes.Long()));
    
  6. 停止Kafka
    1. 使用 Ctrl-C 停止生产者和消费者客户端。
    2. 使用 Ctrl-C 停止 Kafka broker。
    3. 最后,使用 Ctrl-C 停止 ZooKeeper。
    如果你还想删除你的本地Kafka环境的数据,包括你创建的消息,运行命令。
    $ rm -rf /tmp/kafka-logs /tmp/zookeeper
    

集群搭建

背景:如安装所示,已在集群机器上安装了 Kafka。


设置多个broker集群:

  1. 为每个broker创建一个配置文件:
    > cp config/server.properties config/server-1.properties
    
    > cp config/server.properties config/server-2.properties
    
  2. 为不同broker修改配置文件:
    config/server-1.properties: 
        broker.id=1 
        listeners=PLAINTEXT://:9093 
        log.dir=/tmp/kafka-logs-1
    
    config/server-2.properties: 
        broker.id=2 
        listeners=PLAINTEXT://:9094 
        log.dir=/tmp/kafka-logs-2
    
    • broker.id 是集群中每个节点的唯一且永久的名称。
    • 【修改端口和日志目录是因为,现在在同一台机器上运行,要防止broker在同一端口上注册和覆盖对方的数据】
  3. 在启动新的kafka节点:
    > bin/kafka-server-start.sh config/server-1.properties &
    ...
    
    > bin/kafka-server-start.sh config/server-2.properties &
    ...
    

使用

  1. 创建一个新topic
    > bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
    
  2. 命令“describe topics”,查看集群
    > bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
    Topic:my-replicated-topic    PartitionCount:1    ReplicationFactor:3    Configs:
    Topic: my-replicated-topic    Partition: 0    Leader: 1    Replicas: 1,2,0    Isr: 1,2,0
    
    其中:第一行是所有分区的摘要,其次,每一行提供一个分区信息(因为我们只有一个分区,所以只有一行)。
    • Leader”:该节点负责该分区的所有的读和写(每个节点的leader都是随机选择的)。
    • Replicas”:备份的节点列表,无论该节点是否是leader或者目前是否还活着,只是显示。
    • Isr”:“同步备份”的节点列表,也就是活着的节点并且正在同步leader。
  3. 发布信息
    > bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
     ...
    my test message 1
    my test message 2
    ^C
    
  4. 消费消息
    > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
     ...
    my test message 1
    my test message 2
    ^C
    
  5. 集群容错测试
    1. kill掉leader,Broker1作为当前的leader,也就是kill掉Broker1。
      > ps | grep server-1.properties
      7564 ttys002    0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.6/Home/bin/java... 
      > kill -9 7564
      
      • 在Windows上使用:
        > wmic process where "caption = 'java.exe' and commandline like '%server-1.properties%'" get processid
        ProcessId
        6016
        > taskkill /pid 6016 /f
        
    2. 命令“describe topics”,查看集群:
      > binbin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
      Topic:my-replicated-topic    PartitionCount:1    ReplicationFactor:3    Configs:
      Topic: my-replicated-topic    Partition: 0    Leader: 2    Replicas: 1,2,0    Isr: 2,0
      
      备份节点之一成为新的leader,而broker1已经不在同步备份集合里了。
    3. 消费消息:
      > bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic
      ...
      my test message 1
      my test message 2
      ^C
      
      如上,消息仍然没丢。

推荐配置

最重要的 producer 配置控制:

  • 压缩
  • 同步生产 vs 异步生产
  • 批处理大小(异步生产)


最重要的 consumer 配置:

  • 获取消息的大小

生产者服务器配置

服务器生产服务器配置:

# Replication configurations
num.replica.fetchers=4
replica.fetch.max.bytes=1048576
replica.fetch.wait.max.ms=500
replica.high.watermark.checkpoint.interval.ms=5000
replica.socket.timeout.ms=30000
replica.socket.receive.buffer.bytes=65536
replica.lag.time.max.ms=10000
replica.lag.max.messages=4000

controller.socket.timeout.ms=30000
controller.message.queue.size=10

# Log configuration
num.partitions=8
message.max.bytes=1000000
auto.create.topics.enable=true
log.index.interval.bytes=4096
log.index.size.max.bytes=10485760
log.retention.hours=168
log.flush.interval.ms=10000
log.flush.interval.messages=20000
log.flush.scheduler.interval.ms=2000
log.roll.hours=168
log.retention.check.interval.ms=300000
log.segment.bytes=1073741824

# ZK configuration
zookeeper.connection.timeout.ms=6000
zookeeper.sync.time.ms=2000

# Socket server configuration
num.io.threads=8
num.network.threads=8
socket.request.max.bytes=104857600
socket.receive.buffer.bytes=1048576
socket.send.buffer.bytes=1048576
queued.max.requests=16
fetch.purgatory.purge.interval.requests=100
producer.purgatory.purge.interval.requests=100


新版本推荐:

# ZooKeeper
zookeeper.connect=[list of ZooKeeper servers]

# Log configuration
num.partitions=8
default.replication.factor=3
log.dir=[List of directories. Kafka should have its own dedicated disk(s) or SSD(s).]

# Other configurations
broker.id=[An integer. Start with 0 and increment by 1 for each new broker.]
listeners=[list of listeners]
auto.create.topics.enable=false
min.insync.replicas=2
queued.max.requests=[number of concurrent requests]

Java版本

从安全的角度,我们推荐你使用最新的发布版本 JDK 1.8,旧版本已经公开披露了一些安全漏洞,LinkedIn 现在正在运行的是JDK 1.8 u5(希望升级到新版本)使用 G1 收集器。

如果你想在在 JDK 1.7 使用G1收集器(当前默认),请确保在 u51 或更高的版本,LinkedIn 尝试在 u21 测试,但该版本存在大量 G1 执行的问题。

LinkedIn的配置如下:

-Xmx6g -Xms6g -XX:MetaspaceSize=96m -XX:+UseG1GC
-XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M
-XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80


供参考,下面是关于 LinkedIn 的繁忙集群 (高峰) 之一的统计:

  • 60 brokers
  • 50k 分区 (副本 2)
  • 800k 消息/秒
  • 300 MB/sec的入站, 1 GB/秒+ 出站

这个调整看来相当激进, 但是集群中的有 90% 的 GC 暂停时间大约是 21ms, 以及每秒小于 1 个的年轻代 GC。

配置

Broker

基本配置如下:

  • broker.id
  • log.dirs
  • zookeeper.connect

详细配置:

名称 描述 类型 默认 有效值 重要程度 更新模式
kafka >= 0.10版
zookeeper.connect zookeeper host string string
advertised.host.name 过时的:当advertised.listeners或listeners没设置时候才使用。
  • 请改用advertised.listeners。Hostname发布到Zookeeper供客户端使用。
  • 在IaaS环境中,Broker可能需要绑定不同的接口。
  • 如果没有设置,将会使用host.name(如果配置了)。否则将从java.net.InetAddress.getCanonicalHostName()获取。
string null
advertised.listeners 发布到Zookeeper供客户端使用监听(如果不同)。在IaaS环境中,broker可能需要绑定不同的接口。如果没设置,则使用listeners。 string null
advertised.port 过时的:当advertised.listeners或listeners没有设置才使用。
  • 请改用advertised.listeners。
  • 端口发布到Zookeeper供客户端使用,在IaaS环境中,broker可能需要绑定到不同的端口。
  • 如果没有设置,将和broker绑定的同一个端口。
int null
auto.create.topics.enable 启用自动创建topic boolean true
auto.leader.rebalance.enable 启用自动平衡leader。如果需要,后台线程会定期检查并触发leader平衡。 boolean true
background.threads 用于各种后台处理任务的线程数 int 10 [1,...]
broker.id 服务器的broker id。
  • 如果未设置,将生成一个独一无二的broker id。

要避免zookeeper生成的broker id和用户配置的broker id冲突,从reserved.broker.max.id + 1开始生成。

int -1
compression.type 为给定topic指定最终的压缩类型。支持标准的压缩编码器('gzip', 'snappy', 'lz4')。也接受'未压缩',就是没有压缩。保留由producer设置的原始的压缩编码。 string producer
delete.topic.enable 启用删除topic。如果此配置已关闭,通过管理工具删除topic将没有任何效果 boolean false
host.name 过时的:当listeners没有设置才会使用。
  • 请改用listeners。
  • 如果设置,它将只绑定到此地址。如果没有设置,它将绑定到所有接口
string ""
leader.imbalance.check.interval.seconds 由控制器触发分区再平衡检查的频率 long 300
leader.imbalance.per.broker.percentage 允许每个broker的leader比例不平衡。如果每个broker的值高于此值,控制器将触发leader平衡,该值以百分比的形式指定。 int 10
listeners 监听列表 - 监听逗号分隔的URL列表和协议。
  • 指定hostname为0.0.0.0绑定到所有接口,将hostname留空则绑定到默认接口。
  • 合法的listener列表是:
    PLAINTEXT://myhost:9092,TRACE://:9091
    PLAINTEXT://0.0.0.0:9092, TRACE://localhost:9093
string null
log.dir 保存日志数据的目录 (补充log.dirs属性) string /tmp/kafka-logs
log.dirs 保存日志数据的目录。如果未设置,则使用log.dir中的值 string null
log.flush.interval.messages 消息刷新到磁盘之前,累计在日志分区的消息数 long 9223372036854775807 [1,...]
log.flush.interval.ms topic中的消息在刷新到磁盘之前保存在内存中的最大时间(以毫秒为单位),如果未设置,则使用log.flush.scheduler.interval.ms中的值 null
log.flush.offset.checkpoint.interval.ms 我们更新的持续记录的最后一次刷新的频率。作为日志的恢复点。 int 60000 [0,...]
log.flush.scheduler.interval.ms 日志刷新的频率(以毫秒为单位)检查是否有任何日志需要刷新到磁盘 long 9223372036854775807
log.retention.bytes 删除日志之前的最大大小 long -1
log.retention.hours 删除日志文件保留的小时数(以小时为单位)。第三级是log.retention.ms属性 int 168
log.retention.minutes 删除日志文件之前保留的分钟数(以分钟为单位)。次于log.retention.ms属性。如果没设置,则使用log.retention.hours的值。 int null
log.retention.ms 删除日志文件之前保留的毫秒数(以毫秒为单位),如果未设置,则使用log.retention.minutes的值。 long null
log.roll.hours 新建一个日志段的最大时间(以小时为单位),次于log.roll.ms属性 int 168 [1,...]
log.roll.jitter.hours 从logRollTimeMillis(以小时为单位)减去最大抖动,次于log.roll.jitter.ms属性。 int 0 [0,...]
log.roll.ms 新建一个日志段之前的最大事时间(以毫秒为单位)。如果未设置,则使用log.roll.hours的值。 long null
log.segment.bytes 单个日志文件的最大大小 int 1073741824 [14,...]
log.segment.delete.delay.ms 从文件系统中删除文件之前的等待的时间 long 60000 [0,...]
message.max.bytes 服务器可以接收的消息的最大大小 int 1000012 [0,...]
min.insync.replicas 当producer设置acks为"all"(或"-1")时。min.insync.replicas指定必须应答成功写入的replicas最小数。
  • 如果不能满足最小值,那么producer抛出一个异常(NotEnoughReplicas或NotEnoughReplicasAfterAppend)。
  • 当一起使用时,min.insync.replicas和acks提供最大的耐用性保证。
    一个典型的场景是创建一个复制因子3的topic,设置min.insync.replicas为2,并且ack是“all”。
  • 如果多数副本没有接到写入时,将会抛出一个异常。
int 1 [1,...]
num.io.threads 服务器用于执行网络请求的io线程数 int 8 [1,...]
num.network.threads 服务器用于处理网络请求的线程数。 int 3 [1,...]
num.recovery.threads.per.data.dir 每个数据的目录线程数,用于启动时日志恢复和关闭时flush。 int 1 [1,...]
num.replica.fetchers 从源broker复制消息的提取线程数。递增该值可提高 follower broker的I/O的并发。 int 1
offset.metadata.max.bytes offset提交关联元数据条目的最大大小 int 4096
offsets.commit.required.acks commit之前需要的应答数,通常,不应覆盖默认的(-1) short -1
offsets.commit.timeout.ms Offset提交延迟,直到所有副本都收到提交或超时。 这类似于生产者请求超时。 int 5000 [1,...]
offsets.load.buffer.size 当加载offset到缓存时,从offset段读取的批量大小。 int 5242880 [1,...]
offsets.retention.check.interval.ms 检查过期的offset的频率。 long 600000 [1,...]
offsets.retention.minutes offset topic的日志保留时间(分钟) int 1440 [1,...]
offsets.topic.compression.codec 压缩编码器的offset topic - 压缩可以用于实现“原子”提交 int 0
offsets.topic.num.partitions offset commit topic的分区数(部署之后不应更改) int 50 [1,...]
offsets.topic.replication.factor offset topic复制因子(ps:就是备份数,设置的越高来确保可用性)。

为了确保offset topic有效的复制因子,第一次请求offset topic时,活的broker的数量必须最少最少是配置的复制因子数。 如果不是,offset topic将创建失败或获取最小的复制因子(活着的broker,复制因子的配置)

short 3 [1,...]
offsets.topic.segment.bytes offset topic段字节应该相对较小一点,以便于加快日志压缩和缓存加载 int 104857600 [1,...]
port 过时的:当listener没有设置才使用。请改用listeners。该port监听和接收连接。 int 9092
queued.max.requests 在阻塞网络线程之前允许的排队请求数 int 500 [1,...]
quota.consumer.default 过时的:当默认动态的quotas没有配置或在Zookeeper时。如果每秒获取的字节比此值高,所有消费者将通过clientId/consumer区分限流。 long 9223372036854775807 [1,...]
quota.producer.default 过时的:当默认动态的quotas没有配置,或在zookeeper时。如果生产者每秒比此值高,所有生产者将通过clientId区分限流。 long 9223372036854775807 [1,...]
replica.fetch.min.bytes Minimum 每个获取响应的字节数。如果没有满足字节数,等待replicaMaxWaitTimeMs。 int 1
replica.fetch.wait.max.ms 跟随者副本发出每个获取请求的最大等待时间,此值应始终小于replica.lag.time.max.ms,以防止低吞吐的topic的ISR频繁的收缩。 int 500
replica.high.watermark.checkpoint.interval.ms 达到高“水位”保存到磁盘的频率。 long 5000
replica.lag.time.max.ms 如果一个追随者没有发送任何获取请求或至少在这个时间的这个leader的没有消费完。该leader将从isr中移除这个追随者。 long 10000
replica.socket.receive.buffer.bytes 用于网络请求的socket接收缓存区 int 65536
replica.socket.timeout.ms 网络请求的socket超时,该值最少是replica.fetch.wait.max.ms int 30000
request.timeout.ms 该配置控制客户端等待请求的响应的最大时间。

如果超过时间还没收到消费。客户端将重新发送请求,如果重试次数耗尽,则请求失败。

int 30000
socket.receive.buffer.bytes socket服务的SO_RCVBUF缓冲区。如果是-1,则默认使用OS的。 int 102400
socket.request.max.bytes socket请求的最大字节数 int 104857600 [1,...]
socket.send.buffer.bytes socket服务的SO_SNDBUF缓冲区。如果是-1,则默认使用OS的。 int 102400
unclean.leader.election.enable 是否启用不在ISR中的副本参与选举leader的最后的手段。这样做有可能丢失数据。 boolean true
zookeeper.connection.timeout.ms 连接zookeeper的最大等待时间,如果未设置,则使用zookeeper.session.timeout.ms。 int null
zookeeper.session.timeout.ms Zookeeper会话的超时时间 int 6000
zookeeper.set.acl 设置客户端使用安全的ACL boolean false
broker.id.generation.enable 启用自动生成broker id。启用该配置时应检查reserved.broker.max.id。 boolean true
broker.rack broker机架,用于机架感知副本分配的失败容错。例如:RACK1, us-east-1d string null
connections.max.idle.ms Idle 连接超时:闲置时间超过该设置,则服务器socket处理线程关闭这个连接。 long 600000
controlled.shutdown.enable 启用服务器的关闭控制。 boolean true
controlled.shutdown.max.retries 控制因多种原因导致的shutdown失败,当这样失败发生,尝试重试的次数 int 3
controlled.shutdown.retry.backoff.ms 在每次重试之前,系统需要时间从导致先前故障的状态(控制器故障转移,复制延迟等)恢复。 此配置是重试之前等待的时间数。 long 5000
controller.socket.timeout.ms 控制器到broker通道的sockt超时时间 int 30000
default.replication.factor 自动创建topic的默认的副本数 int 1
fetch.purgatory.purge.interval.requests 拉取请求清洗间隔(请求数) int 1000
group.max.session.timeout.ms 已注册的消费者允许的最大会话超时时间,设置的时候越长使消费者有更多时间去处理心跳之间的消息。但察觉故障的时间也拉长了。 int 300000
group.min.session.timeout.ms 已经注册的消费者允许最小的会话超时时间,更短的时间去快速的察觉到故障,代价是频繁的心跳,这可能会占用大量的broker资源。 int 6000
inter.broker.protocol.version 指定broker内部通讯使用的版本。通常在更新broker时使用。有效的值为:0.8.0, 0.8.1, 0.8.1.1, 0.8.2, 0.8.2.0, 0.8.2.1, 0.9.0.0, 0.9.0.1。查看ApiVersion找到的全部列表。 string 0.10.1-IV2
log.cleaner.backoff.ms 当没有日志要清理时,休眠的时间 long 15000 [0,...]
log.cleaner.dedupe.buffer.size 用于日志去重的内存总量(所有cleaner线程) long 134217728
log.cleaner.delete.retention.ms 删除记录保留多长时间? long 86400000
log.cleaner.enable 在服务器上启用日志清洗处理?如果使用的任何topic的cleanup.policy=compact包含内部的offset topic,应启动。如果禁用,那些topic将不会被压缩并且会不断的增大。 boolean true
log.cleaner.io.buffer.load.factor 日志cleaner去重缓冲负载因子。去重缓冲区的百分比,较高的值将允许同时清除更多的日志,但将会导致更多的hash冲突。 double 0.9
log.cleaner.io.buffer.size 所有日志清洁器线程I/O缓存的总内存 int 524288 [0,...]
log.cleaner.io.max.bytes.per.second 日志清理器限制,以便其读写i/o平均小与此值。 double 1.7976931348623157E308
log.cleaner.min.cleanable.ratio 脏日志与日志的总量的最小比率,以符合清理条件 double 0.5
log.cleaner.min.compaction.lag.ms 一条消息在日志保留不压缩的最小时间,仅适用于正在压缩的日志。 long 0
log.cleaner.threads 用于日志清除的后台线程数 int 1 [0,...]
log.cleanup.policy 超过保留时间段的默认清除策略。逗号分隔的有效的策略列表。有效的策略有:“delete”和“compact” list [delete] [compact, delete]
log.index.interval.bytes 添加一个条目到offset的间隔 int 4096(4 kibibytes) [0,...]
log.index.size.max.bytes offset index的最大大小(字节) int 10485760 [4,...]
log.message.format.version 指定追加到日志中的消息格式版本。例如: 0.8.2, 0.9.0.0, 0.10.0。

通过设置一个特定消息格式版本,用户需要保证磁盘上所有现有的消息小于或等于指定的版本。错误的设置将导致旧版本的消费者中断,因为消费者接收一个不理解的消息格式。

string 0.10.1-IV2
log.message.timestamp.difference.max.ms 如果log.message.timestamp.type=CreateTime,broker接收消息时的时间戳和消息中指定的时间戳之间允许的最大差异。
  • 如果时间戳超过此阈值,则消息将被拒绝。如果log.message.timestamp.type=LogAppendTime,则此配置忽略。
long 9223372036854775807 [0,...]
log.message.timestamp.type 定义消息中的时间戳是消息创建时间或日志追加时间。该值可设置为CreateTime 或 LogAppendTime string CreateTime [CreateTime, LogAppendTime]
log.preallocate 在创建新段时预分配文件?如果你在Windowns上使用kafka,你可能需要设置它为true。 boolean false
log.retention.check.interval.ms 日志清除程序检查日志是否满足被删除的频率(以毫秒为单位) long 300000 [1,...]
max.connections.per.ip 允许每个ip地址的最大连接数。 int 2147483647 [1,...]
max.connections.per.ip.overrides per-ip或hostname覆盖默认最大连接数 string ""
num.partitions topic的默认分区数 int 1 [1,...]
principal.builder.class 实现PrincipalBuilder接口类的完全限定名,该接口目前用于构建与SSL SecurityProtocol连接的Principal。 class class org。apache。kafka。common。security。auth。DefaultPrincipalBuilder(wiki傻逼编辑器:。换位.)  
producer.purgatory.purge.interval.requests 生产者请求purgatory的清洗间隔(请求数) int 1000
replica.fetch.backoff.ms 当拉取分区发生错误时休眠的时间 int 1000 [0,...]
replica.fetch.max.bytes 拉取每个分区的消息的字节数。这不是绝对的最大值,如果提取的第一个非空分区中的第一个消息大于这个值,则消息仍然返回,以确保进展。通过message.max.bytes (broker配置)或max.message.bytes (topic配置)定义broker接收的最大消息大小。 int 1048576 [0,...]
replica.fetch.response.max.bytes 预计整个获取响应的最大字节数,这不是绝对的最大值,如果提取的第一个非空分区中的第一个消息大于这个值,则消息仍然返回,以确保进展。

通过message.max.bytes (broker配置)或max.message.bytes (topic配置)定义broker接收的最大消息大小。

int 10485760 [0,...]
reserved.broker.max.id broker.id的最大数 int 1000 [0,...]
sasl.enabled.mechanisms 可用的SASL机制列表,包含任何可用的安全提供程序的机制。默认情况下只有GSSAPI是启用的。 list [GSSAPI]
sasl.kerberos.kinit.cmd Kerberos kinit 命令路径。 string /usr/bin/kinit
sasl.kerberos.min.time.before.relogin 登录线程在刷新尝试的休眠时间。 long 60000
sasl.kerberos.principal.to.local.rules principal名称映射到一个短名称(通常是操作系统用户名)。按顺序,使用与principal名称匹配的第一个规则将其映射其到短名称。忽略后面的规则。

默认情况下,{username}/{hostname}@{REALM} 映射到 {username}。

list [DEFAULT]
sasl.kerberos.service.name Kafka运行的Kerberos principal名称。可以在JAAS或Kafka的配置文件中定义。 string null
sasl.kerberos.ticket.renew.jitter 添加到更新时间的随机抖动的百分比 time. double 0.05
sasl.kerberos.ticket.renew.window.factor 登录线程休眠,直到从上次刷新到ticket的到期的时间已到达(指定窗口因子),在此期间它将尝试更新ticket。 double 0.8
sasl.mechanism.inter.broker.protocol SASL机制,用于broker之间的通讯,默认是GSSAPI。 string GSSAPI
security.inter.broker.protocolSecurity broker之间的通讯协议,有效值有:PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL。 string PLAINTEXT
ssl.cipher.suites 密码套件列表。认证,加密,MAC和秘钥交换算法的组合,用于使用TLS或SSL的网络协议交涉网络连接的安全设置,默认情况下,支持所有可用的密码套件。 list null
ssl.client.auth 配置请求客户端的broker认证。

常见的设置:

  • “ssl.client.auth=required”需要客户端认证。
  • “ssl.client.auth=requested”客户端认证可选,不同于requested,客户端可选择不提供自身的身份验证信息。
  • “ssl.client.auth=none”不需要客户端身份认证
string none [required, requested, none]
ssl.enabled.protocols 已启用的SSL连接协议列表。 list [TLSv1.2, TLSv1.1, TLSv1]
ssl.key.password 秘钥库文件中的私钥密码。对客户端是可选的。 password null
ssl.keymanager.algorithm 用于SSL连接的密钥管理工厂算法。默认值是Java虚拟机的密钥管理工厂算法。 string SunX509
ssl.keystore.location 密钥仓库文件的位置。客户端可选,并可用于客户端的双向认证。 string null
ssl.keystore.password 密钥仓库文件的仓库密码。客户端可选,只有ssl.keystore.location配置了才需要。 password null
ssl.keystore.type 密钥仓库文件的格式。客户端可选。 string JKS
ssl.protocol 用于生成SSLContext,默认是TLS,适用于大多数情况。允许使用最新的JVM,LS, TLSv1.1 和TLSv1.2。 SSL,SSLv2和SSLv3 老的JVM也可能支持,由于有已知的安全漏洞,不建议使用。 string TLS
ssl.provider 用于SSL连接的安全提供程序的名称。默认值是JVM的安全程序。 string null
ssl.trustmanager.algorithm 信任管理工厂用于SSL连接的算法。默认为Java虚拟机配置的信任算法。 string PKIX
ssl.truststore.location 信任仓库文件的位置 string null
ssl.truststore.password 信任仓库文件的密码 password null
ssl.truststore.type 信任仓库文件的文件格式 string JKS
authorizer.class.name 用于认证的授权程序类 string ""
metric.reporters 度量报告的类列表,通过实现MetricReporter接口,允许插入新度量标准类。JmxReporter包含注册JVM统计。 list []
metrics.num.samples 维持计算度量的样本数。 int 2 [1,...]
metrics.sample.window.ms 计算度量样本的时间窗口 long 30000 [1,...]
quota.window.num 在内存中保留客户端限额的样本数 int 11 [1,...]
quota.window.size.seconds 每个客户端限额的样本时间跨度 int 1 [1,...]
replication.quota.window.num 在内存中保留副本限额的样本数 int 11 [1,...]
replication.quota.window.size.seconds 每个副本限额样本数的时间跨度 int 1 [1,...]
ssl.endpoint.identification.algorithm 端点身份标识算法,使用服务器证书验证服务器主机名。 string null
ssl.secure.random.implementation 用于SSL加密操作的SecureRandom PRNG实现。 string null
zookeeper.sync.time.ms ZK follower可落后与leader多久。 int 2000
以下是kafka新版本的增量配置
kafka >= 1.0
group.initial.rebalance.delay.ms 分组协调器在执行第一次重新平衡之前,等待更多消费者加入新组的时间。延迟时间越长,意味着重新平衡的次数可能越少,但会增加处理开始前的时间。 int 3000 只读
transaction.abort.timed.out.transaction.cleanup.interval.ms 回滚已超时的事务的时间间隔。 int 10000 (10 seconds) [1,...] 只读
transaction.remove.expired.transaction.cleanup.interval.ms 删除因transactional.id.expiration.ms过期的事务的时间间隔。 int 3600000 (1 hour) [1,...] 只读
transaction.max.timeout.ms 事务的最大允许超时时间。如果客户端请求的交易时间超过了这个时间,那么broker将在InitProducerIdRequest中返回一个错误。

这可以防止客户端的超时时间过大,从而阻滞消费者从事务中包含的主题中读取。

int 900000 (15 minutes) [1,...] 只读
transaction.state.log.load.buffer.size 在将生产者id和事务加载到缓存中时,从事务日志段读取的批次大小(软限制,如果消息太大,则重写) int 5242880 [1,...] 只读
transaction.state.log.min.isr 覆盖事务topic的min.insync.replicas配置。 int 2 [1,...] 只读
transaction.state.log.num.partitions 事务topic的分区数(部署后不应改变)。 int 50 [1,...] 只读
transaction.state.log.replication.factor 事务topic的复制因子(设置较高来确保可用性)。内部topic创建将失败,直到集群规模满足该复制因子要求。 short 3 [1,...] 只读
transaction.state.log.segment.bytes 事务topic段的字节数应保持相对较小,以利于加快日志压缩和缓存加载速度 int 104857600 (100 mebibytes) [1,...] 只读
transactional.id.expiration.ms 事务协调器在没有收到当前事务的任何事务状态更新的情况下,在其事务id过期前等待的时间,单位为ms。

这个设置也会影响生产者id过期:一旦这个时间在给定的生产者id最后一次写入后过去,生产者id就会过期。

  • 请注意,如果由于主题的保留设置而删除了生产者id的最后一次写入,那么生产者id可能会更快过期。
int 604800000 (7 days) [1,...] 只读
kafka >= 2.0
sasl.client.callback.handler.class 实现AuthenticateCallbackHandler接口的SASL客户端回调处理程序类的全称。 class null 中间 只读
sasl.login.callback.handler.class 实现AuthenticateCallbackHandler接口的SASL登录回调处理程序类的全称。对于broker来说,登录回调处理程序配置必须以监听器前缀和小写的SASL机制名称为前缀。

例如,listener.name.sasl_ssl.scram-sha-256.sasl.login.callback.handler.class=com.example.CustomScramLoginCallbackHandler

class null 中间 只读
sasl.login.class 实现Login接口的类的全称。对于broker来说,login config必须以监听器前缀和SASL机制名称为前缀,并使用小写。

例如,listener.name.sasl_ssl.scram-sha-256.sasl.login.class=com.example.CustomScramLogin。

class null 中间 只读
kafka >= 2.5
zookeeper.clientCnxnSocket 当使用TLS连接到ZooKeeper时,通常设置为org.apache.zookeeper.ClientCnxnSocketNetty。

覆盖任何同名的zookeeper.clientCnxnSocket设置的显式值。

string null 中间 只读
zookeeper.ssl.client.enable 设置客户端连接到ZooKeeper时使用TLS。

显式的值会覆盖任何通过zookeeper.client.secure设置的值(注意名称不同)。 如果两者都没有设置,默认为false; 当为true时,必须设置zookeeper.clientCnxnSocket(通常为org.apache.zookeeper.ClientCnxnSocketNetty); 其他需要设置的值可能包括zookeeper.ssl.cipher.suites、zookeeper.ssl.crl.enable、zookeeper.ssl.enabled.protocols、zookeeper.ssl.endpoint. identification.algorithm,zookeeper.ssl.keystore.location,zookeeper.ssl.keystore.password,zookeeper.ssl.keystore.type,zookeeper.ssl. ocsp.enable, zookeeper.ssl.protocol, zookeeper.ssl.truststore.location, zookeeper.ssl.truststore.password, zookeeper.ssl.truststore.type。

boolean false 中间 只读
zookeeper.ssl.keystore.location 当使用客户端证书与TLS连接到ZooKeeper时的keystore位置。

覆盖任何通过zookeeper.ssl.keyStore.location系统属性设置的显式值(注意是驼峰大小)。

password null 中间 只读
zookeeper.ssl.keystore.password 当使用客户端证书与TLS连接到ZooKeeper时的keystore密码。覆盖任何通过`zookeeper.ssl.keyStore.password系统属性设置的显式值(注意驼峰大写)。 注意,ZooKeeper不支持与keystore密码不同的密钥密码,所以一定要将keystore中的密钥密码设置为与keystore密码相同,否则连接Zookeeper的尝试将失败。 password null 中间 只读
zookeeper.ssl.keystore.type 当使用客户端证书与TLS连接到ZooKeeper时的keystore类型。覆盖任何通过zookeeper.ssl.keyStore.type系统属性设置的显式值(注意骆驼大写)。

默认值为null意味着该类型将根据keystore的文件扩展名自动检测。

string null 中间 只读
zookeeper.ssl.protocol 指定ZooKeeper TLS协商中使用的协议。

一个显式的值会覆盖任何通过同名的zookeeper.ssl.protocol系统设置的值。

string TLSv1.2 只读
zookeeper.ssl.cipher.suites 指定在ZooKeeper TLS协商中使用的密码套件(csv),覆盖任何通过zookeeper.ssl.ciphersuites系统属性设置的显式值(注意单字 "ciphersuites")。

覆盖任何通过zookeeper.ssl.ciphersuites系统属性设置的显式值(注意 "ciphersuites "这个单字)。 默认值为 "null "意味着启用的密码套件列表是由正在使用的Java运行时决定的。

boolean false 只读
zookeeper.ssl.crl.enable 指定是否启用ZooKeeper TLS协议中的证书撤销列表。

覆盖任何通过zookeeper.ssl.crl系统属性设置的显式值(注意是短名)。

boolean false 只读
zookeeper.ssl.enabled.protocols 指定ZooKeeper TLS协商(csv)中启用的协议。

覆盖任何通过zookeeper.ssl.enabledProtocols系统属性设置的显式值(注意骆驼大写)。 默认值为 "null "意味着启用的协议将是zookeeper.ssl.protocol配置属性的值。

list null 只读
zookeeper.ssl.endpoint.identification.algorithm 指定是否在ZooKeeper TLS协商过程中启用主机名验证,(不区分大小写)"https "表示启用ZooKeeper主机名验证,显式的空白值表示禁用(仅为测试目的建议禁用)。

明确的值会覆盖任何通过zookeeper.ssl.hostnameVerification系统属性设置的 "true "或 "false "值(注意不同的名称和值;true意味着https,false意味着空白)。

string HTTPS 只读
zookeeper.ssl.ocsp.enable 指定是否启用ZooKeeper TLS协议中的在线证书状态协议。

覆盖任何通过zookeeper.ssl.ocsp系统属性设置的显式值(注意是短名)。

boolean false 只读
kafka >= 2.7
ssl.truststore.certificates 可信证书的格式由'ssl.truststore.type'指定。

默认的SSL引擎工厂只支持带X.509证书的PEM格式。

password null 中间 每个broker
socket.connection.setup.timeout.max.ms 客户端等待建立socket连接的最大时间。

连接设置超时时间将随着每一次连续的连接失败而成倍增加,直到这个最大值。 为了避免连接风暴,超时时间将被应用一个0.2的随机因子,导致计算值在20%以下和20%以上的随机范围。

long 127000 (127 seconds) 中间 只读
socket.connection.setup.timeout.ms 客户端等待建立socket连接的时间。

如果在超时之前没有建立连接,客户端将关闭socket通道。

long 10000 (10 seconds) 中间 只读

Topic

与topic相关的配置,服务器的默认值,也可选择的覆盖指定的topic。

  • 如果没有给出指定topic的配置,则将使用服务器默认值。
  • 可以通过“-config”选项在topic创建时设置。


此示例使用自定义最大消息的大小和刷新率,创建一个名为 my-topic 的topic:

> bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic my-topic --partitions 1
--replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1

也可以使用“alter configs”命令修改或设置。 此示例修改更新 my-topic 的最大的消息大小:

> bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic
    --alter --add-config max.message.bytes=128000

可以执行以下命令验证结果:

> bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic --describe

移除设置:

> bin/kafka-configs.sh --zookeeper localhost:2181  --entity-type topics --entity-name my-topic --alter --delete-config max.message.bytes


详细配置:

名称 描述 类型 默认 有效值 服务器默认属性 更新模式
cleanup.policy “delete”或“compact”。指定在旧的日志段的保留策略。

默认策略(“delete”),将达到保留时间或大小限制的日志废弃。 “compact”则压缩日志。

list delete [compact, delete] log.cleanup.policy
compression.type 针对指定的topic设置最终的压缩方式。

标准的压缩格式有'gzip', 'snappy', lz4。还可以设置'uncompressed',就是不压缩;设置为'producer'这意味着保留生产者设置的原始压缩编解码。

string producer [uncompressed, snappy, lz4, gzip, producer] compression.type
delete.retention.ms 保留删除消息压缩topic的删除标记的时间。此设置还给出消费者如果从offset 0开始读取并确保获得最终阶段的有效快照的时间范围(否则,在完成扫描之前可能已经回收了)。 long 86400000 [0,...] log.cleaner.delete.retention.ms
file.delete.delay.ms 从文件系统中删除文件之前等待的时间 long 60000 [0,...] log.segment.delete.delay.ms
flush.messages 此设置允许指定我们强制fsync写入日志的数据的间隔。

例如,如果这被设置为1,我们将在每个消息之后fsync; 如果是5,我们将在每五个消息之后fsync。

  • 一般,我们建议不要设置它,使用复制特性来保持持久性,并允许操作系统的后台刷新功能更高效。可以在每个topic的基础上覆盖此设置(请参阅每个主题的配置部分)。
long 9223372036854775807 [0,...] log.flush.interval.messages
flush.ms 此设置允许我们强制fsync写入日志的数据的时间间隔。

例如,如果这设置为1000,那么在1000ms过去之后,我们将fsync。

  • 一般,我们建议不要设置它,并使用复制来保持持久性,并允许操作系统的后台刷新功能,因为它更有效率
long 9223372036854775807 [0,...] log.flush.interval.ms
follower.replication.throttled.replicas follower复制限流列表。

该列表应以[PartitionId]的形式描述一组副本:[BrokerId],[PartitionId]:[BrokerId]:...或者通配符'*'可用于限制此topic的所有副本。

list "" [partitionId],[brokerId]:[partitionId],[brokerId]:... follower.replication.throttled.replicas
index.interval.bytes 此设置控制Kafka向其offset索引添加索引条目的频率。默认设置确保我们大致每4096个字节索引消息。

更多的索引允许读取更接近日志中的确切位置,但使索引更大。你不需要改变这个值。

int 4096 [0,...] log.index.interval.bytes
leader.replication.throttled.replicas 在leader方面进行限制的副本列表。

该列表设置以[PartitionId]的形式描述限制副本:[PartitionId]:[BrokerId],[PartitionId]:[BrokerId]:...或使用通配符‘*’限制该topic的所有副本。

list "" [partitionId],[brokerId]:[partitionId],[brokerId]:... leader.replication.throttled.replicas
max.message.bytes kafka允许的最大的消息批次大小。

如果增加此值,并且消费者的版本比0.10.2老,那么消费者的提取的大小也必须增加,以便他们可以获取大的消息批次。 在最新的消息格式版本中,消息总是分组批量来提高效率。 在之前的消息格式版本中,未压缩的记录不会分组批量,并且此限制仅适用于该情况下的单个消息。

int 1000012 [0,...] message.max.bytes
message.format.version 指定消息附加到日志的消息格式版本。

该值应该是一个有效的ApiVersion。例如:0.8.2, 0.9.0.0, 0.10.0,更多细节检查ApiVersion。 通过设置特定的消息格式版本,并且磁盘上的所有现有消息都小于或等于指定版本。 不正确地设置此值将导致消费者使用旧版本,因为他们将接收到“不认识”的格式的消息。

string 0.11.0-IV2 log.message.format.version
min.cleanable.dirty.ratio 此配置控制日志压缩程序将尝试清除日志的频率(假设启用了日志压缩)。

默认情况下,我们将避免清理超过50%日志被压缩的日志。 该比率限制日志中浪费的最大空间重复(在最多50%的日志中可以是重复的50%)。 更高的比率意味着更少,更有效的清洁,但意味着日志中的浪费更多。

double 0.5 [0,...,1] log.cleaner.min.cleanable.ratio
min.compaction.lag.ms 消息在日志中保持不压缩的最短时间。

仅适用于正在压缩的日志。

long 0 [0,...] log.cleaner.min.compaction.lag.ms
min.insync.replicas 当生产者设置应答为"all"(或“-1”)时,此配置指定了成功写入的副本应答的最小数。

如果没满足此最小数,则生产者将引发异常(NotEnoughReplicas或NotEnoughReplicasAfterAppend) 当min.insync.replicas和acks强制更大的耐用性时。 典型的情况是创建一个副本为3的topic,将min.insync.replicas设置为2,并设置acks为“all”。 如果多数副本没有收到写入,这将确保生产者引发异常。

int 1 [1,...] min.insync.replicas
preallocate 如果我们在创建新的日志段时在磁盘上预分配该文件,那么设为True。 boolean false log.preallocate
retention.bytes 如果我们使用“删除”保留策略,则此配置将控制日志可以增长的最大大小,之后我们将丢弃旧的日志段以释放空间。

默认情况下,没有设置大小限制则仅限于时间限制。

long -1 log.retention.bytes
retention.ms 如果我们使用“删除”保留策略,则此配置控制我们将保留日志的最长时间,然后我们将丢弃旧的日志段以释放空间。

这代表SLA消费者必须读取数据的时间长度。

long 604800000 log.retention.ms
segment.bytes 此配置控制日志的段文件大小。一次保留和清理一个文件,因此较大的段大小意味着较少的文件,但对保留率的粒度控制较少。 int 1073741824 [14,...] log.segment.bytes
segment.index.bytes 此配置控制offset映射到文件位置的索引的大小。我们预先分配此索引文件,并在日志滚动后收缩它。通常不需要更改此设置。 int 10485760 [0,...] log.index.size.max.bytes
segment.jitter.ms 从计划的段滚动时间减去最大随机抖动,以避免异常的段滚动 long 0 [0,...] log.roll.jitter.ms
segment.ms 此配置控制Kafka强制日志滚动的时间段,以确保保留可以删除或压缩旧数据,即使段文件未满。 long 604800000 [0,...] log.roll.ms
unclean.leader.election.enable 是否将不在ISR中的副本作为最后的手段选举为leader,即使这样做可能会导致数据丢失。 boolean false unclean.leader.election.enable
kafka > 2.0
message.downconversion.enable 此配置控制是否启用消息格式的向下转换以满足消费请求。

当设置为false时,broker不会对期待旧消息格式的消费者执行向下转换。broker 会对来自此类旧客户端的消费请求作出 UNSUPPORTED_VERSION 错误响应。

  • 这个配置不适用于复制到followers时可能需要的任何消息格式转换。
boolean false log.message.downconversion.enable

Producer

详细配置:

名称 描述 类型 默认 有效值 重要程度
bootstrap.servers host/port列表,用于初始化建立和Kafka集群的连接。

列表格式为host1:port1,host2:port2,....,无需添加所有的集群地址,kafka会根据提供的地址发现其他的地址(你可以多提供几个,以防提供的服务器关闭)

list
key.serializer 实现 org.apache.kafka.common.serialization.Serializer 接口的 key 的 Serializer 类。 class
value.serializer 实现 org.apache.kafka.common.serialization.Serializer 接口的value 的 Serializer 类。 class
acks 生产者需要leader确认请求完成之前接收的应答数。

此配置控制了发送消息的耐用性,支持以下配置:

  1. acks=0
    如果设置为0,那么生产者将不等待任何消息确认。消息将立刻添加到socket缓冲区并考虑发送。
    在这种情况下不能保障消息被服务器接收到。并且重试机制不会生效(因为客户端不知道故障了没有)。每个消息返回的offset始终设置为-1。
  2. acks=1,
    这意味着leader写入消息到本地日志就立即响应,而不等待所有follower应答。
    在这种情况下,如果响应消息之后但follower还未复制之前leader立即故障,那么消息将会丢失。
  3. acks=all
    这意味着leader将等待所有副本同步后应答消息。此配置保障消息不会丢失(只要至少有一个同步的副本或者)。
    这是最强壮的可用性保障。等价于acks=-1。
string 1 [all, -1, 0, 1]
buffer.memory 生产者用来缓存等待发送到服务器的消息的内存总字节数。如果消息发送比可传递到服务器的快,生产者将阻塞max.block.ms之后,抛出异常。

此设置应该大致的对应生产者将要使用的总内存,但不是硬约束,因为生产者所使用的所有内存都用于缓冲。一些额外的内存将用于压缩(如果启动压缩),以及用于保持发送中的请求。

long 33554432 [0,...]
compression.type 数据压缩的类型。默认为空(就是不压缩)。有效的值有 none,gzip,snappy, 或 lz4。

压缩全部的数据批,因此批的效果也将影响压缩的比率(更多的批次意味着更好的压缩)。

string none
retries 设置一个比零大的值,客户端如果发送失败则会重新发送。

注意,这个重试功能和客户端在接到错误之后重新发送没什么不同。 如果max.in.flight.requests.per.connection没有设置为1,有可能改变消息发送的顺序,因为如果2个批次发送到一个分区中,并第一个失败了并重试,但是第二个成功了,那么第二个批次将超过第一个。

int 0 [0,...,2147483647]
ssl.key.password 密钥仓库文件中的私钥的密码。 password null
ssl.keystore.location 密钥仓库文件的位置。可用于客户端的双向认证。 string null
ssl.keystore.password 密钥仓库文件的仓库密码。

只有配置了ssl.keystore.location时才需要。

password null
ssl.truststore.location 信任仓库的位置 string null
ssl.truststore.password 信任仓库文件的密码 password null
batch.size 当多个消息要发送到相同分区的时,生产者尝试将消息批量打包在一起,以减少请求交互。

这样有助于客户端和服务端的性能提升。该配置的默认批次大小(以字节为单位): 不会打包大于此配置大小的消息。 发送到broker的请求将包含多个批次,每个分区一个,用于发送数据。 较小的批次大小有可能降低吞吐量(批次大小为0则完全禁用批处理)。 一个非常大的批次大小可能更浪费内存。因为我们会预先分配这个资源。

int 16384 [0,...]
client.id 当发出请求时传递给服务器的id字符串。这样做的目的是允许服务器请求记录记录这个【逻辑应用名】,这样能够追踪请求的源,而不仅仅只是ip/prot。 string ""
connections.max.idle.ms 多少毫秒之后关闭闲置的连接。 long 540000
linger.ms 生产者组将发送的消息组合成单个批量请求。

正常情况下,只有消息到达的速度比发送速度快的情况下才会出现。但是,在某些情况下,即使在适度的负载下,客户端也可能希望减少请求数量。此设置通过添加少量人为延迟来实现。- 也就是说,不是立即发出一个消息,生产者将等待一个给定的延迟,以便和其他的消息可以组合成一个批次。这类似于Nagle在TCP中的算法。此设置给出批量延迟的上限:一旦我们达到分区的batch.size值的记录,将立即发送,不管这个设置如何,但是,如果比这个小,我们将在指定的“linger”时间内等待更多的消息加入。此设置默认为0(即无延迟)。假设,设置 linger.ms=5,将达到减少发送的请求数量的效果,但对于在没有负载情况,将增加5ms的延迟。

long 0 [0,...]
max.block.ms 该配置控制 KafkaProducer.send() 和 KafkaProducer.partitionsFor() 将阻塞多长时间。

此外这些方法被阻止,也可能是因为缓冲区已满或元数据不可用。在用户提供的序列化程序或分区器中的锁定不会计入此超时。

long 60000 [0,...]
max.request.size 请求的最大大小(以字节为单位)。

此设置将限制生产者的单个请求中发送的消息批次数,以避免发送过大的请求。这也是最大消息批量大小的上限。 请注意,服务器拥有自己的批量大小,可能与此不同。

int 1048576 [0,...]
partitioner.class 实现Partitioner接口的的Partitioner类。 class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes 读取数据时使用的TCP接收缓冲区(SO_RCVBUF)的大小。

如果值为-1,则将使用OS默认值。

int 32768 [-1,...]
request.timeout.ms 该配置控制客户端等待请求响应的最长时间。

如果在超时之前未收到响应,客户端将在必要时重新发送请求,如果重试耗尽,则该请求将失败。 这应该大于replica.lag.time.max.ms,以减少由于不必要的生产者重试引起的消息重复的可能性。

int 30000 [0,...]
sasl.jaas.config JAAS配置文件使用的格式的SASL连接的JAAS登录上下文参数。这里描述JAAS配置文件格式。该值的格式为:'(=)*;' password null
sasl.kerberos.service.name Kafka运行的Kerberos主体名称。

可以在Kafka的JAAS配置或Kafka的配置中定义。

string null
sasl.mechanism SASL机制用于客户端连接。这是安全提供者可用与任何机制。

GSSAPI是默认机制。

string GSSAPI
security.protocol 用于与broker通讯的协议。

有效值为:PLAINTEXT,SSL,SASL_PLAINTEXT,SASL_SSL。

string PLAINTEXT
send.buffer.bytes 发送数据时,用于TCP发送缓存(SO_SNDBUF)的大小。

如果值为 -1,将默认使用系统的。

int 131072 [-1,...]
ssl.enabled.protocols 启用SSL连接的协议列表。 list TLSv1.2,TLSv1.1,TLSv1
ssl.keystore.type 密钥存储文件的文件格式。

对于客户端是可选的。

string JKS
ssl.protocol 最近的JVM中允许的值是TLS,TLSv1.1和TLSv1.2。

较旧的JVM可能支持SSL,SSLv2和SSLv3,但由于已知的安全漏洞,不建议使用SSL。

string TLS
ssl.provider 用于SSL连接的安全提供程序的名称。默认值是JVM的默认安全提供程序。 string null
ssl.truststore.type 信任仓库文件的文件格式。 string JKS
enable.idempotence 当设置为‘true’,生产者将确保每个消息正好一次复制写入到stream。如果‘false’,由于broker故障,生产者重试。

即,可以在流中写入重试的消息。此设置默认是‘false’。请注意,启用幂等式需要将max.in.flight.requests.per.connection设置为1,重试次数不能为零。 另外acks必须设置为“全部”。如果这些值保持默认值,我们将覆盖默认值。 如果这些值设置为与幂等生成器不兼容的值,则将抛出一个ConfigException异常。 如果这些值设置为与幂等生成器不兼容的值,则将抛出一个ConfigException异常。

boolean false
interceptor.classes 实现ProducerInterceptor接口,你可以在生产者发布到Kafka群集之前拦截(也可变更)生产者收到的消息。

默认情况下没有拦截器。

list null
max.in.flight.requests.per.connection 阻塞之前,客户端单个连接上发送的未应答请求的最大数量。

注意,如果此设置设置大于1且发送失败,则会由于重试(如果启用了重试)会导致消息重新排序的风险。

int 5 [1,...]
metadata.max.age.ms 在一段时间段之后(以毫秒为单位),强制更新元数据,即使我们没有看到任何分区leader的变化,也会主动去发现新的broker或分区。 long 300000 [0,...]
metric.reporters 用作metrics reporters(指标记录员)的类的列表。

实现MetricReporter接口,将受到新增加的度量标准创建类插入的通知。 JmxReporter始终包含在注册JMX统计信息中。

list ""
metrics.num.samples 维护用于计算度量的样例数量。 int 2 [1,...]
metrics.recording.level 指标的最高记录级别。 string INFO [INFO, DEBUG]
metrics.sample.window.ms 度量样例计算上 long 30000 [0,...]
reconnect.backoff.max.ms 重新连接到重复无法连接的代理程序时等待的最大时间(毫秒)。

如果提供,每个主机的回退将会连续增加,直到达到最大值。 计算后退增加后,增加20%的随机抖动以避免连接风暴。

long 1000 [0,...]
reconnect.backoff.ms 尝试重新连接到给定主机之前等待的基本时间量。这避免了在循环中高频率的重复连接到主机。这种回退适应于客户端对broker的所有连接尝试。 long 50 [0,...]
retry.backoff.ms 尝试重试指定topic分区的失败请求之前等待的时间。

这样可以避免在某些故障情况下高频次的重复发送请求。

long 100 [0,...]
sasl.kerberos.kinit.cmd Kerberos kinit 命令路径。 string /usr/bin/kinit
sasl.kerberos.min.time.before.relogin Login线程刷新尝试之间的休眠时间。 long 60000
sasl.kerberos.ticket.renew.jitter 添加更新时间的随机抖动百分比。 double 0.05
sasl.kerberos.ticket.renew.window.factor 登录线程将睡眠,直到从上次刷新ticket到期时间的指定窗口因子为止,此时将尝试续订ticket。 double 0.8
ssl.cipher.suites 密码套件列表。这是使用TLS或SSL网络协议来协商用于网络连接的安全设置的认证,加密,MAC和密钥交换算法的命名组合。默认情况下,支持所有可用的密码套件。 list null
ssl.endpoint.identification.algorithm 使用服务器证书验证服务器主机名的端点识别算法。 string null
ssl.keymanager.algorithm 用于SSL连接的密钥管理因子算法。默认值是为Java虚拟机配置的密钥管理器工厂算法。 string SunX509
ssl.secure.random.implementation 用于SSL加密操作的SecureRandom PRNG实现。 string null
ssl.trustmanager.algorithm 用于SSL连接的信任管理因子算法。

默认值是JAVA虚拟机配置的信任管理工厂算法。

string PKIX
transaction.timeout.ms 生产者在主动中止正在进行的交易之前,交易协调器等待事务状态更新的最大时间(以ms为单位)。

如果此值大于broker中的max.transaction.timeout.ms设置,则请求将失败,并报“InvalidTransactionTimeout”错误。

int 60000
transactional.id 用于事务传递的TransactionalId。

这样可以跨多个生产者会话的可靠性语义,因为它允许客户端保证在开始任何新事务之前使用相同的TransactionalId的事务已经完成。 如果没有提供TransactionalId,则生产者被限制为幂等传递。 请注意,如果配置了TransactionalId,则必须启用enable.idempotence。 默认值为空,这意味着无法使用事务。

string null non-empty string
kafka >= 2.0.0
sasl.client.callback.handler.class 实现AuthenticateCallbackHandler接口的SASL客户端回调处理程序类的全称。 class null 中间
sasl.login.callback.handler.class 实现AuthenticateCallbackHandler接口的SASL登录回调处理程序类的全称。

对于broker来说,登录回调处理程序配置必须以监听器前缀和小写的SASL机制名称为前缀。 例如,listener.name.sasl_ssl.scram-sha-256.sasl.login.callback.handler.class=com.example.CustomScramLoginCallbackHandler

class null
sasl.login.class 实现Login接口的类的全称。

对于broker来说,login config必须以监听器前缀和SASL机制名称为前缀,并使用小写。 例如,listener.name.sasl_ssl.scram-sha-256.sasl.login.class=com.example.CustomScramLogin。

class null
kafka >= 2.1.0
client.dns.lookup 控制客户端如何使用DNS查询。如果设置为 use_all_dns_ips,则依次连接到每个返回的IP地址,直到成功建立连接。断开连接后,使用下一个IP。

一旦所有的IP都被使用过一次,客户端就会再次从主机名中解析IP(s)(然而,JVM和操作系统都会缓存DNS名称查询)。如果设置为 resolve_canonical_bootstrap_servers_only,则将每个引导地址解析成一个canonical名称列表。在bootstrap阶段之后,这和use_all_dns_ips的行为是一样的。如果设置为 default(已弃用),则尝试连接到查找返回的第一个IP地址,即使查找返回多个IP地址。

string use_all_dns_ips [default, use_all_dns_ips, resolve_canonical_bootstrap_servers_only]
delivery.timeout.ms 调用send()返回后报告成功或失败的时间上限。

这限制了消息在发送前被延迟的总时间,等待broker确认的时间(如果期望的话),以及允许重试发送失败的时间。如果遇到不可恢复的错误,重试次数已经用尽,或者消息被添加到一个达到较早发送到期期限的批次中,生产者可能会报告未能在这个配置之前发送记录。 这个配置的值应该大于或等于request.timeout.ms和linger.ms之和。

int 120000 (2 minutes) [0,...]
kafka >= 2.7
ssl.truststore.certificates 可信证书的格式由'ssl.truststore.type'指定。

默认的SSL引擎工厂只支持带X.509证书的PEM格式。

password null
socket.connection.setup.timeout.max.ms 客户端等待建立socket连接的最大时间。连接设置超时时间将随着每一次连续的连接失败而成倍增加,直到这个最大值。

为了避免连接风暴,超时时间将被应用一个0.2的随机因子,导致计算值在20%以下和20%以上的随机范围。

long 127000 (127 seconds)
socket.connection.setup.timeout.ms 客户端等待建立socket连接的时间。

如果在超时之前没有建立连接,客户端将关闭socket通道。

long 10000 (10 seconds)

Consumer

在0.9.0.0中,我们引入了新的Java消费者来替代早期基于Scala的简单和高级消费者。新老客户端的配置如下。

新消费者配置:

名称 描述 类型 默认 有效值 重要程度
bootstrap.servers host/port,用于和kafka集群建立初始化连接。

因为这些服务器地址仅用于初始化连接,并通过现有配置的来发现全部的kafka集群成员(集群随时会变化),所以此列表不需要包含完整的集群地址(但尽量多配置几个,以防止配置的服务器宕机)。

list
key.deserializer key的解析序列化接口实现类(Deserializer)。 class
value.deserializer value的解析序列化接口实现类(Deserializer) class
fetch.min.bytes 服务器哦拉取请求返回的最小数据量,如果数据不足,请求将等待数据积累。

默认设置为1字节,表示只要单个字节的数据可用或者读取等待请求超时,就会应答读取请求。 将此值设置的越大将导致服务器等待数据累积的越长,这可能以一些额外延迟为代价提高服务器吞吐量。

int 1 [0,...]
group.id 此消费者所属消费者组的唯一标识。如果消费者用于订阅或offset管理策略的组管理功能,则此属性是必须的。 string ""
heartbeat.interval.ms 当使用Kafka的分组管理功能时,心跳到消费者协调器之间的预计时间。心跳用于确保消费者的会话保持活动状态,并当有新消费者加入或离开组时方便重新平衡。

该值必须必比session.timeout.ms小,通常不高于1/3。它可以调整的更低,以控制正常重新平衡的预期时间。

int 3000
max.partition.fetch.bytes 服务器将返回每个分区的最大数据量。

如果拉取的第一个非空分区中第一个消息大于此限制,则仍然会返回消息,以确保消费者可以正常的工作。 broker接受的最大消息大小通过message.max.bytes(broker config)或max.message.bytes (topic config)定义。 参阅fetch.max.bytes以限制消费者请求大小。

int 1048576 [0,...]
session.timeout.ms 用于发现消费者故障的超时时间。

消费者周期性的发送心跳到broker,表示其还活着。如果会话超时期满之前没有收到心跳,那么broker将从分组中移除消费者,并启动重新平衡。 请注意,该值必须在broker配置的group.min.session.timeout.ms和group.max.session.timeout.ms允许的范围内。

int 10000
ssl.key.password 密钥存储文件中的私钥的密码。客户端可选 password null
ssl.keystore.location 密钥存储文件的位置, 这对于客户端是可选的,并且可以用于客户端的双向认证。 string null
ssl.keystore.password 密钥仓库文件的仓库密码。

客户端可选,只有ssl.keystore.location配置了才需要。

password null
ssl.truststore.location 信任仓库文件的位置 string null
ssl.truststore.password 信任仓库文件的密码 password null
auto.offset.reset 当Kafka中没有初始offset或如果当前的offset不存在时(例如,该数据被删除了),该怎么办。
  1. 最早:自动将偏移重置为最早的偏移
  2. 最新:自动将偏移重置为最新偏移
  3. none:如果消费者组找到之前的offset,则向消费者抛出异常
  4. 其他:抛出异常给消费者。
string latest [latest, earliest, none]
connections.max.idle.ms 指定在多少毫秒之后关闭闲置的连接 long 540000
enable.auto.commit 如果为true,消费者的offset将在后台周期性的提交 boolean true
exclude.internal.topics 内部topic的记录(如偏移量)是否应向消费者公开。如果设置为true,则从内部topic接受记录的唯一方法是订阅它。 boolean true
fetch.max.bytes 服务器为拉取请求返回的最大数据值。

这不是绝对的最大值,如果在第一次非空分区拉取的第一条消息大于该值,该消息将仍然返回,以确保消费者继续工作。 接收的最大消息大小通过message.max.bytes (broker config) 或 max.message.bytes (topic config)定义。 注意,消费者是并行执行多个提取的。

int 52428800 [0,...]
max.poll.interval.ms 使用消费者组管理时poll()调用之间的最大延迟。消费者在获取更多记录之前可以空闲的时间量的上限。

如果此超时时间期满之前poll()没有调用,则消费者被视为失败,并且分组将重新平衡,以便将分区重新分配给别的成员。

int 300000 [1,...]
max.poll.records 在单次调用poll()中返回的最大记录数。 int 500 [1,...]
partition.assignment.strategy 当使用组管理时,客户端将使用分区分配策略的类名来分配消费者实例之间的分区所有权 list class org.apache.kafka.clients.consumer.RangeAssignor
receive.buffer.bytes 读取数据时使用的TCP接收缓冲区(SO_RCVBUF)的大小。

如果值为-1,则将使用OS默认值。

int 65536 [-1,...]
request.timeout.ms 配置控制客户端等待请求响应的最长时间。

如果在超时之前未收到响应,客户端将在必要时重新发送请求,如果重试耗尽则客户端将重新发送请求。

int 305000 [0,...]
sasl.jaas.config JAAS配置文件中SASL连接登录上下文参数。

这里描述JAAS配置文件格式。该值的格式为: '(=)*;'

password null
sasl.kerberos.service.name Kafka运行Kerberos principal名。可以在Kafka的JAAS配置文件或在Kafka的配置文件中定义。 string null
sasl.mechanism 用于客户端连接的SASL机制。安全提供者可用的机制。GSSAPI是默认机制。 string GSSAPI
security.protocol 用于与broker通讯的协议。

有效值为:PLAINTEXT,SSL,SASL_PLAINTEXT,SASL_SSL。

string PLAINTEXT
send.buffer.bytes 发送数据时要使用的TCP发送缓冲区(SO_SNDBUF)的大小。

如果值为-1,则将使用OS默认值。

int 131072 [-1,...]
ssl.enabled.protocols 启用SSL连接的协议列表。 list TLSv1.2,TLSv1.1,TLSv1
ssl.keystore.type key仓库文件的文件格式,客户端可选。 string JKS
ssl.protocol 用于生成SSLContext的SSL协议。

默认设置是TLS,这对大多数情况都是适用的。 最新的JVM中的允许值为TLS,TLSv1.1和TLSv1.2。 较旧的JVM可能支持SSL,SSLv2和SSLv3,但由于已知的安全漏洞,不建议使用SSL。

string TLS
ssl.provider 用于SSL连接的安全提供程序的名称。

默认值是JVM的默认安全提供程序。

string null
ssl.truststore.type 信任存储文件的文件格式。 string JKS
auto.commit.interval.ms 如果enable.auto.commit设置为true,则消费者偏移量自动提交给Kafka的频率(以毫秒为单位)。 int 5000 [0,...]
check.crcs 自动检查CRC32记录的消耗。

这样可以确保消息发生时不会在线或磁盘损坏。 此检查增加了一些开销,因此在寻求极致性能的情况下可能会被禁用。

boolean true
client.id 在发出请求时传递给服务器的id字符串。

这样做的目的是通过允许将逻辑应用程序名称包含在服务器端请求日志记录中,来跟踪ip/port的请求源。

string ""
fetch.max.wait.ms 如果没有足够的数据满足fetch.min.bytes,服务器将在接收到提取请求之前阻止的最大时间。 int 500 [0,...]
interceptor.classes 用作拦截器的类的列表。

你可实现ConsumerInterceptor接口以允许拦截(也可能变化)消费者接收的记录。 默认情况下,没有拦截器。

list null
metadata.max.age.ms 在一定时间段之后(以毫秒为单位的),强制更新元数据,即使没有任何分区领导变化,任何新的broker或分区。 long 300000 [0,...]
metric.reporters 用作度量记录员类的列表。

实现MetricReporter接口以允许插入通知新的度量创建的类。 JmxReporter始终包含在注册JMX统计信息中。

list ""
metrics.num.samples 保持的样本数以计算度量。 int 2 [1,...]
metrics.recording.level 最高的记录级别。 string INFO [INFO, DEBUG]
metrics.sample.window.ms The window of time a metrics sample is computed over. long 30000 [0,...]
reconnect.backoff.ms 尝试重新连接指定主机之前等待的时间,避免频繁的连接主机,这种机制适用于消费者向broker发送的所有请求。 long 50 [0,...]
retry.backoff.ms 尝试重新发送失败的请求到指定topic分区之前的等待时间。

避免在某些故障情况下,频繁的重复发送。

long 100 [0,...]
sasl.kerberos.kinit.cmd Kerberos kinit命令路径。 string /usr/bin/kinit
sasl.kerberos.min.time.before.relogin 尝试/恢复之间的登录线程的休眠时间。 long 60000
sasl.kerberos.ticket.renew.jitter 添加到更新时间的随机抖动百分比。 double 0.05
sasl.kerberos.ticket.renew.window.factor 登录线程将休眠,直到从上次刷新到ticket的指定的时间窗口因子到期,此时将尝试续订ticket。 double 0.8
ssl.cipher.suites 密码套件列表,用于TLS或SSL网络协议的安全设置,认证,加密,MAC和密钥交换算法的明明组合。

默认情况下,支持所有可用的密码套件。

list null
ssl.endpoint.identification.algorithm 使用服务器证书验证服务器主机名的端点识别算法。 string null
ssl.keymanager.algorithm 密钥管理器工厂用于SSL连接的算法。

默认值是为Java虚拟机配置的密钥管理器工厂算法。

string SunX509
ssl.secure.random.implementation 用于SSL加密操作的SecureRandom PRNG实现。 string null
ssl.trustmanager.algorithm 信任管理器工厂用于SSL连接的算法。

默认值是为Java虚拟机配置的信任管理器工厂算法。

string PKIX
kafka >= 2.0.0
sasl.client.callback.handler.class 实现AuthenticateCallbackHandler接口的SASL客户端回调处理程序类的全称。 class null
sasl.login.callback.handler.class 实现AuthenticateCallbackHandler接口的SASL登录回调处理程序类的全称。

对于broker来说,登录回调处理程序配置必须以监听器前缀和小写的SASL机制名称为前缀。 例如,listener.name.sasl_ssl.scram-sha-256.sasl.login.callback.handler.class=com.example.CustomScramLoginCallbackHandler

class null
sasl.login.class 实现Login接口的类的全称。

对于broker来说,login config必须以监听器前缀和SASL机制名称为前缀,并使用小写。 例如,listener.name.sasl_ssl.scram-sha-256.sasl.login.class=com.example.CustomScramLogin。

class null
kafka >= 2.1.0
client.dns.lookup 控制客户端如何使用DNS查询。
  • 如果设置为 use_all_dns_ips,则依次连接到每个返回的IP地址,直到成功建立连接。断开连接后,使用下一个IP。
    一旦所有的IP都被使用过一次,客户端就会再次从主机名中解析IP(s)(然而,JVM和操作系统都会缓存DNS名称查询)。
  • 如果设置为 resolve_canonical_bootstrap_servers_only,则将每个引导地址解析成一个canonical名称列表。在bootstrap阶段之后,这和use_all_dns_ips的行为是一样的。
  • 如果设置为 default(已弃用),则尝试连接到查找返回的第一个IP地址,即使查找返回多个IP地址。
string use_all_dns_ips [default, use_all_dns_ips, resolve_canonical_bootstrap_servers_only]
kafka >= 2.7
ssl.truststore.certificates 可信证书的格式由'ssl.truststore.type'指定。

默认的SSL引擎工厂只支持带X.509证书的PEM格式。

password null
socket.connection.setup.timeout.max.ms 客户端等待建立socket连接的最大时间。

连接设置超时时间将随着每一次连续的连接失败而成倍增加,直到这个最大值。 为了避免连接风暴,超时时间将被应用一个0.2的随机因子,导致计算值在20%以下和20%以上的随机范围。

long 127000 (127 seconds)
socket.connection.setup.timeout.ms 客户端等待建立socket连接的时间。

如果在超时之前没有建立连接,客户端将关闭socket通道。

long 10000 (10 seconds)

旧消费者配置:

  • group.id
  • zookeeper.connect
名称 默认值 描述
group.id 标识消费者所属消费者组(独一的)。通过设置相同的组ID,多个消费者表明属于该消费者组的一部分。
zookeeper.connect 指定ZooKeeper连接字符串,格式为hostname:port,其中host和port是ZooKeeper服务器的主机和端口。 为了使ZooKeeper宕机时连接到其他ZooKeeper节点,你还可以以hostname1:host1,hostname2:port2,hostname3:port3的形式指定多个主机。

还可以设置ZooKeeper chroot路径,作为其ZooKeeper连接字符串的一部分,将其数据放置在全局ZooKeeper命名空间中的某个路径下。 如果是这样,消费者应该在其连接字符串中使用相同的chroot路径。 例如,要给出/chroot/path的chroot路径,你需要将该值设置为:hostname1:port1,hostname2:port2,hostname3:port3/chroot/path。

consumer.id null 如果未设置将自动生成。
socket.timeout.ms 30 * 1000 网络请求socker的超时时间。实际的超时是 max.fetch.wait+socket.timeout.ms的时间。
socket.receive.buffer.bytes 64 * 1024 网络请求socker的接收缓存大小
fetch.message.max.bytes 1024 * 1024 每个拉取请求的每个topic分区尝试获取的消息的字节大小。这些字节将被读入每个分区的内存,因此这有助于控制消费者使用的内存。 拉取请求的大小至少与服务器允许的最大消息的大小一样大,否则生产者可能发送大于消费者可以拉取的消息。
num.consumer.fetchers 1 用于拉取数据的拉取线程数。
auto.commit.enable true 如果为true,请定期向ZooKeeper提交消费者已经获取的消息的偏移量。 当进程失败时,将使用这种承诺偏移量作为新消费者开始的位置。
auto.commit.interval.ms 60 * 1000 消费者offset提交到zookeeper的频率(以毫秒为单位)
queued.max.message.chunks 2 消费缓存消息块的最大大小。每个块可以达到fetch.message.max.bytes。
rebalance.max.retries 4 当新的消费者加入消费者组时,消费者集合尝试“重新平衡”负载,并为每个消费者分配分区。如果消费者集合在分配时发生时发生变化,则重新平衡将失败并重试。此设置控制尝试之前的最大尝试次数。
fetch.min.bytes 1 拉取请求返回最小的数据量。如果没有足够的数据,请求将等待数据积累,然后应答请求。
fetch.wait.max.ms 100 如果没有足够的数据(fetch.min.bytes),服务器将在返回请求数据之前阻塞的最长时间。
rebalance.backoff.ms 2000 重新平衡时重试之间的回退时间。如果未设置,则使用zookeeper.sync.time.ms中的值。
refresh.leader.backoff.ms 200 回退时间等待,然后再尝试选举一个刚刚失去leader的分区。
auto.offset.reset largest 如果ZooKeeper中没有初始偏移量,或偏移值超出范围,该怎么办?
  1. 最小:自动将偏移重置为最小偏移
  2. 最大:自动将偏移重置为最大偏移
  3. 其他:抛出异常消费者
consumer.timeout.ms -1 如果在指定的时间间隔后没有消息可用,则向用户发出超时异常
exclude.internal.topics true 来自内部topic的消息(如偏移量)是否应该暴露给消费者。
client.id group id value 客户端ID是每个请求中发送的用户指定的字符串,用于帮助跟踪调用。 它应该逻辑地标识发出请求的应用程序。
zookeeper.session.timeout.ms 6000 ZooKeeper会话超时。如果消费者在这段时间内没有对ZooKeeper心跳,那么它被认为是死亡的,并且会发生重新平衡。
zookeeper.connection.timeout.ms 6000 与zookeeper建立连接时客户端等待的最长时间。
zookeeper.sync.time.ms 2000 ZK follower可以罗ZK leader多久
offsets.storage zookeeper 选择存储偏移量的位置(zookeeper或kafka)。
offsets.channel.backoff.ms 1000 重新连接offset通道或重试失败的偏移提取/提交请求时的回退周期。
offsets.channel.socket.timeout.ms 10000 读取offset拉取/提交响应的Socker的超时时间。此超时也用于查询offset manager的ConsumerMetadata请求。
offsets.commit.max.retries 5 失败时重试偏移提交的最大次数。
  • 此重试计数仅适用于停机期间的offset提交,它不适用于自动提交线程的提交。
  • 它也不适用于在提交offset之前查询偏移协调器的尝试。即如果消费者元数据请求由于任何原因而失败,则将重试它,并且重试不计入该限制。
dual.commit.enabled true 如果使用“kafka”作为offsets.storage,则可以向ZooKeeper(除Kafka之外)进行双重提交offset。

在从基于zookeeper的offset存储迁移到kafka存储的时候可以这么做。 对于任何给定的消费者组,在该组中的所有实例已迁移到提交到broker(而不是直接到ZooKeeper)的新的版本之后,可以关闭这个。

partition.assignment.strategy range 在“range”或“roundrobin”策略之间选择将分区分配给消费者流。

循环分区分配器分配所有可用的分区和所有可用的消费者线程。然后,继续从分区到消费者线程进行循环任务。如果所有消费者实例的订阅是相同的,则分区将被均匀分布。(即,分区所有权计数将在所有消费者线程之间的差异仅在一个delta之内。)

  • 循环分配仅在以下情况下被允许:
    1. (a)每个主题在消费者实例中具有相同数量的流
    2. (b)订阅的topic的对于组内的每个消费者实例都是相同的。

范围(Range)分区基于每个topic。对于每个主题,我们按数字顺序排列可用的分区,并以字典顺序排列消费者线程。然后,我们将分区数除以消费者流(线程)的总数来确定分配给每个消费者的分区数。 如果不均匀分割,那么前几个消费者将会有多的分区。

Streams

Kafka Stream客户端库配置:

名称 描述 类型 默认 有效值 重要程度
application.id 流处理应用程序标识。必须在Kafka集群中是独一无二的。

1)默认客户端ID前缀,2)成员资格管理的group-id,3)changgelog的topic前缀

string
bootstrap.servers 用于建立与Kafka集群的初始连接的主机/端口列表。

客户端将会连接所有服务器,跟指定哪些服务器无关 - 通过指定的服务器列表会自动发现全部的服务器。 此列表格式host1:port1,host2:port2,...由于这些服务器仅用于初始连接以发现完整的集群成员(可能会动态更改),所以此列表不需要包含完整集 的服务器(您可能需要多个服务器,以防指定的服务器关闭)。

list
replication.factor 流处理程序创建更改日志topic和重新分配topic的副本数 int 1
state.dir 状态存储的目录地址。 string /tmp/kafka-streams
cache.max.bytes.buffering 用于缓冲所有线程的最大内存字节数 long 10485760 [0,...]
client.id 发出请求时传递给服务器的id字符串。

这样做的目的是通过允许将逻辑应用程序名称包含在服务器端请求日志记录中,来追踪请求源的ip/port。

string ""
default.key.serde 用于实现Serde接口的key的默认序列化器/解串器类。 class org.apache.kafka.common.serialization.Serdes$ByteArraySerde
default.timestamp.extractor 实现TimestampExtractor接口的默认时间戳提取器类。 class org.apache.kafka.streams.processor.FailOnInvalidTimestamp
default.value.serde 用于实现Serde接口的值的默认serializer / deserializer类。 class org.apache.kafka.common.serialization.Serdes$ByteArraySerde
num.standby.replicas 每个任务的备用副本数。 int 0
num.stream.threads 执行流处理的线程数。 int 1
processing.guarantee 应使用的加工保证。

可能的值为at_least_once(默认)和exact_once。

string at_least_once [at_least_once, exactly_once]
security.protocol 用于与broker沟通的协议。

有效值为:PLAINTEXT,SSL,SASL_PLAINTEXT,SASL_SSL。

string PLAINTEXT
application.server host:port指向用户嵌入定义的末端,可用于发现单个KafkaStreams应用程序中状态存储的位置 string ""
buffered.records.per.partition 每个分区缓存的最大记录数。 int 1000
commit.interval.ms 用于保存process位置的频率。

注意,如果'processing.guarantee'设置为'exact_once',默认值为100,否则默认值为30000。

long 30000
connections.max.idle.ms 关闭闲置的连接时间(以毫秒为单位)。 long 540000
key.serde 用于实现Serde接口的key的Serializer/deserializer类.此配置已被弃用,请改用default.key.serde class null
metadata.max.age.ms 即使我们没有看到任何分区leader发生变化,主动发现新的broker或分区,强制更新元数据时间(以毫秒为单位)。 long 300000 [0,...]
metric.reporters metric reporter的类列表。

实现MetricReporter接口,JmxReporter始终包含在注册JMX统计信息中。

list ""
metrics.num.samples 保持的样本数以计算度量。 int 2 [1,...]
metrics.recording.level 日志级别。 string INFO [INFO, DEBUG]
metrics.sample.window.ms 时间窗口计算度量标准。 long 30000 [0,...]
partition.grouper 实现PartitionGrouper接口的Partition grouper类。 class org.apache.kafka.streams.processor.DefaultPartitionGrouper
poll.ms 阻塞输入等待的时间(以毫秒为单位)。 long 100
receive.buffer.bytes 读取数据时使用的TCP接收缓冲区(SO_RCVBUF)的大小。

如果值为-1,则将使用OS默认值。

int 32768 [0,...]
reconnect.backoff.max.ms 因故障无法重新连接broker,重新连接的等待的最大时间(毫秒)。

如果提供,每个主机会连续增加,直到达到最大值。随机递增20%的随机抖动以避免连接风暴。

long 1000 [0,...]
reconnect.backoff.ms 尝试重新连接之前等待的时间。避免在高频繁的重复连接服务器。

这种backoff适用于消费者向broker发送的所有请求。

long 50 [0,...]
request.timeout.ms 控制客户端等待请求响应的最长时间。

如果在配置时间内未收到响应,客户端将在需要时重新发送请求,如果重试耗尽,则请求失败。

int 40000 [0,...]
retry.backoff.ms 尝试重试失败请求之前等待的时间。以避免了在某些故障情况下,在频繁重复发送请求。 long 100 [0,...]
rocksdb.config.setter 一个Rocks DB配置setter类,或实现RocksDBConfigSetter接口的类名 null
send.buffer.bytes 发送数据时要使用的TCP发送缓冲区(SO_SNDBUF)的大小。

如果值为-1,则将使用OS默认值。

int 131072 [0,...]
state.cleanup.delay.ms 在分区迁移删除状态之前等待的时间(毫秒)。 long 60000
timestamp.extractor 实现TimestampExtractor接口的Timestamp抽取器类。
  • 此配置已弃用,请改用default.timestamp.extractor
class null
windowstore.changelog.additional.retention.ms 添加到Windows维护管理器以确保数据不会从日志中过早删除。

默认为1天

long 86400000
zookeeper.connect Zookeeper连接字符串,用于Kafka主题管理。
  • 此配置已被弃用,将被忽略,因为Streams API不再使用Zookeeper。
string ""

Connect

Kafka Connect框架的相关配置:

名称 描述 类型 默认 有效值 重要程度
config.storage.topic kafka topic仓库配置 string
group.id 唯一的字符串,用于标识此worker所属的Connect集群组。 string
key.converter 用于Kafka Connect和写入到Kafka的序列化消息的之间格式转换的转换器类。

这可以控制写入或从kafka读取的消息中的键的格式,并且由于这与连接器无关,因此它允许任何连接器使用任何序列化格式。 常见格式的示例包括JSON和Avro。

class
offset.storage.topic 连接器的offset存储到哪个topic中 string
status.storage.topic 追踪连接器和任务状态存储到哪个topic中 string
value.converter 用于Kafka Connect格式和写入Kafka的序列化格式之间转换的转换器类。

控制了写入或从Kafka读取的消息中的值的格式,并且由于这与连接器无关,因此它允许任何连接器使用任何序列化格式。 常见格式的示例包括JSON和Avro。

class
internal.key.converter 用于在Kafka Connect格式和写入Kafka的序列化格式之间转换的转换器类。

这可以控制写入或从Kafka读取的消息中的key的格式,并且由于这与连接器无关,因此它允许任何连接器使用任何序列化格式。 常见格式的示例包括JSON和Avro。 此设置用于控制框架内部使用的记账数据的格式,例如配置和偏移量,因此用户可以使用运行各种Converter实现。

class
internal.value.converter 用于在Kafka Connect格式和写入Kafka的序列化格式之间转换的转换器类。

这控制了写入或从Kafka读取的消息中的值的格式,并且由于这与连接器无关,因此它允许任何连接器使用任何序列化格式。 常见格式的示例包括JSON和Avro。 此设置用于控制框架内部使用的记账数据的格式,例如配置和偏移量,因此用户可以使用运行各种Converter实现。

class
bootstrap.servers 用于建立与Kafka集群的初始连接的主机/端口列表。此列表用来发现完整服务器集的初始主机。

该列表的格式应为host1:port1,host2:port2,.... 由于这些服务器仅用于初始连接以发现完整的集群成员资格(可能会动态更改),因此,不需要包含完整的服务器(尽管如此,你需要多配置几个,以防止配置的宕机)。

list localhost:9092
heartbeat.interval.ms 心跳间隔时间。心跳用于确保会话保持活动,并在新成员加入或离开组时进行重新平衡。 该值必须设置为低于session.timeout.ms,但通常应设置为不高于该值的1/3。 int 3000
rebalance.timeout.ms 限制所有组中消费者的任务处理数据和提交offset所需的时间。

如果超时,那么woker将从组中删除,这也将导致offset提交失败。

int 60000
session.timeout.ms 用于察觉worker故障的超时时间。

worker定时发送心跳以表明自己是活着的。如果broker在会话超时时间到期之前没有接收到心跳,那么broker将从分组中移除该worker,并启动重新平衡。 注意,该值必须在group.min.session.timeout.ms和group.max.session.timeout.ms范围内。

int 10000
ssl.key.password 密钥存储文件中私钥的密码。

这对于客户端是可选的。

password null
ssl.keystore.location 密钥存储文件的位置。

这对于客户端是可选的,可以用于客户端的双向身份验证。

string null
ssl.keystore.password 密钥存储文件的存储密码。

客户端是可选的,只有配置了ssl.keystore.location才需要。

password null
ssl.truststore.location 信任存储文件的位置。 string null
ssl.truststore.password 信任存储文件的密码。 password null
connections.max.idle.ms 多少毫秒之后关闭空闲的连接。 long 540000
receive.buffer.bytes 读取数据时使用的TCP接收缓冲区(SO_RCVBUF)的大小。

如果值为-1,则将使用OS默认值。

int 32768 [0,...]
request.timeout.ms 配置控制客户端等待请求响应的最长时间。

如果在超时之前未收到响应,客户端将在必要时重新发送请求,如果重试耗尽,则该请求将失败。

int 40000 [0,...]
sasl.jaas.config 用于JAAS配置文件的SASL连接的JAAS登录上下文参数格式。

这里描述了JAAS配置文件的格式。该值的格式为:' (=)*;'

password null
sasl.kerberos.service.name Kafka运行的Kerberos principal名称。

可以在Kafka的JAAS配置或Kafka的配置中定义。

string null
sasl.mechanism 用户客户端连接的SASL机制。可以提供者任何安全机制。

GSSAPI是默认机制。

string GSSAPI
security.protocol 用于和broker通讯的策略。

有效的值有:PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL。

string PLAINTEXT
send.buffer.bytes 发送数据时使用TCP发送缓冲区(SO_SNDBUF)的大小。

如果值为-1,则将使用OS默认。

int 131072 [-1,...]
ssl.enabled.protocols 启用SSL连接的协议列表。 list TLSv1.2,TLSv1.1,TLSv1
ssl.keystore.type 密钥存储文件的文件格式。

对于客户端是可选的。

string JKS
ssl.protocol 用于生成SSLContext的SSL协议。

默认设置是TLS,这对大多数情况都是适用的。 最新的JVM中的允许值为TLS,TLSv1.1和TLSv1.2。 旧的JVM可能支持SSL,SSLv2和SSLv3,但由于已知的安全漏洞,不建议使用SSL。

string TLS
ssl.provider 用于SSL连接的安全提供程序的名称。

默认值是JVM的默认安全提供程序。

string null
ssl.truststore.type 信任存储文件的文件格式。 string JKS
worker.sync.timeout.ms 当worker与其他worker不同步并需要重新同步配置时,需等待一段时间才能离开组,然后才能重新加入。 int 3000
worker.unsync.backoff.ms 当worker与其他worker不同步,并且无法在worker.sync.timeout.ms 期间追赶上,在重新连接之前,退出Connect集群的时间。 int 300000
access.control.allow.methods 通过设置Access-Control-Allow-Methods标头来设置跨源请求支持的方法。

Access-Control-Allow-Methods标头的默认值允许GET,POST和HEAD的跨源请求。

string ""
access.control.allow.origin 将Access-Control-Allow-Origin标头设置为REST API请求。

要启用跨源访问,请将其设置为应该允许访问API的应用程序的域,或者 *" 以允许从任何的域。 默认值只允许从REST API的域访问。

string ""
client.id 在发出请求时传递给服务器的id字符串。

这样做的目的是通过允许逻辑应用程序名称包含在请求消息中,来跟踪请求来源。而不仅仅是ip/port

string ""
config.storage.replication.factor 当创建配置仓库topic时的副本数 short 3 [1,...]
metadata.max.age.ms 在没有任何分区leader改变,主动地发现新的broker或分区的时间。 long 300000 [0,...]
metric.reporters A list of classes to use as metrics reporters.

Implementing the MetricReporter interface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics.

list ""
metrics.num.samples 保留计算metrics的样本数(译者不清楚是做什么的) int 2 [1,...]
metrics.sample.window.ms The window of time a metrics sample is computed over. long 30000 [0,...]
offset.flush.interval.ms 尝试提交任务偏移量的间隔。 long 60000
offset.flush.timeout.ms 在取消进程并恢复要在之后尝试提交的offset数据之前,等待消息刷新并分配要提交到offset仓库的offset数据的最大毫秒数。 long 5000
offset.storage.partitions 创建offset仓库topic的分区数 int 25 [1,...]
offset.storage.replication.factor 创建offset仓库topic的副本数 short 3 [1,...]
plugin.path 包含插件(连接器,转换器,转换)逗号(,)分隔的路径列表。

该列表应包含顶级目录,其中包括以下任何组合:a)包含jars与插件及其依赖关系的目录 b)具有插件及其依赖项的uber-jars c)包含插件类的包目录结构的目录及其依赖关系, 注意:将遵循符号链接来发现依赖关系或插件。 示例:plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors

list null
reconnect.backoff.max.ms 无法连接broker时等待的最大时间(毫秒)。

如果设置,则每个host的将会持续的增加,直到达到最大值。 计算增加后,再增加20%的随机抖动,以避免高频的反复连接。

long 1000 [0,...]
reconnect.backoff.ms 尝试重新连接到主机之前等待的时间。

避免了高频率反复的连接主机。 这种机制适用于消费者向broker发送的所有请求。

long 50 [0,...]
rest.advertised.host.name 如果设置,其他wokers将通过这个hostname进行连接。 string null
rest.advertised.port 如果设置,其他的worker将通过这个端口进行连接。 int null
rest.host.name REST API的主机名。

如果设置,它将只绑定到这个接口。

string null
rest.port 用于监听REST API的端口 int 8083
retry.backoff.ms 失败请求重新尝试之前的等待时间,避免了在某些故障的情况下,频繁的重复发送请求。 long 100 [0,...]
sasl.kerberos.kinit.cmd Kerberos kinit命令路径. string /usr/bin/kinit
sasl.kerberos.min.time.before.relogin 尝试refresh之间登录线程的休眠时间. long 60000
sasl.kerberos.ticket.renew.jitter 添加到更新时间的随机抖动百分比。 double 0.05
sasl.kerberos.ticket.renew.window.factor 登录线程将休眠,直到从上次刷新ticket到期,此时将尝试续订ticket。 double 0.8
ssl.cipher.suites 密码套件列表。用于TLS或SSL网络协议协商网络连接的安全设置的认证,加密,MAC和密钥交换算法的命名组合。

默认情况下,支持所有可用的密码套件。

list null
ssl.endpoint.identification.algorithm 末端识别算法使用服务器证书验证服务器主机名。 string null
ssl.keymanager.algorithm 用于SSL连接的key管理工厂的算法,默认值是Java虚拟机配置的密钥管理工厂算法。 string SunX509
ssl.secure.random.implementation 用于SSL加密操作的SecureRandom PRNG实现。 string null
ssl.trustmanager.algorithm 用于SSL连接的信任管理仓库算法。

默认值是Java虚拟机配置的信任管理器工厂算法。

string PKIX
status.storage.partitions 用于创建状态仓库topic的分区数 int 5 [1,...]
status.storage.replication.factor 用于创建状态仓库topic的副本数 short 3 [1,...]
task.shutdown.graceful.timeout.ms 等待任务正常关闭的时间,这是总时间,不是每个任务,所有任务触发关闭,然后依次等待。 long 5000
kafka >= 2.0.0
sasl.client.callback.handler.class 实现AuthenticateCallbackHandler接口的SASL客户端回调处理程序类的全称。 class null
sasl.login.callback.handler.class 实现AuthenticateCallbackHandler接口的SASL登录回调处理程序类的全称。

对于broker来说,登录回调处理程序配置必须以监听器前缀和小写的SASL机制名称为前缀。 例如,listener.name.sasl_ssl.scram-sha-256.sasl.login.callback.handler.class=com.example.CustomScramLoginCallbackHandler

class null
sasl.login.class 实现Login接口的类的全称。对于broker来说,login config必须以监听器前缀和SASL机制名称为前缀,并使用小写。

例如,listener.name.sasl_ssl.scram-sha-256.sasl.login.class=com.example.CustomScramLogin。

class null
kafka >= 2.7
ssl.truststore.certificates 可信证书的格式由'ssl.truststore.type'指定。

默认的SSL引擎工厂只支持带X.509证书的PEM格式。

password null
socket.connection.setup.timeout.max.ms 客户端等待建立socket连接的最大时间。

连接设置超时时间将随着每一次连续的连接失败而成倍增加,直到这个最大值。 为了避免连接风暴,超时时间将被应用一个0.2的随机因子,导致计算值在20%以下和20%以上的随机范围。

long 127000 (127 seconds)
socket.connection.setup.timeout.ms 客户端等待建立socket连接的时间。

如果在超时之前没有建立连接,客户端将关闭socket通道。

long 10000 (10 seconds)

AdminClient

Kafka Admin客户端的配置:

名称 描述 类型 默认 有效值 重要程度
bootstrap.servers host/port,用于和kafka集群建立初始化连接。

因为这些服务器地址仅用于初始化连接,并通过现有配置的来发现全部的kafka集群成员(集群随时会变化),所以此列表不需要包含完整的集群地址(但尽量多配置几个,以防止配置的服务器宕机)。

list
ssl.key.password 密钥仓库文件中的私钥密码。

对于客户端是可选的。

password null
ssl.keystore.location 密钥仓库文件的位置。

这对于客户端是可选的,可以用于客户端的双向认证。

string null
ssl.keystore.password 密钥仓库文件的仓库密钥。

这对于客户端是可选的,只有配置了ssl.keystore.location才需要。

password null
ssl.truststore.location 信任存储文件的位置。 string null
ssl.truststore.password 信任存储文件的密码。

如果未设置密码,对信任库的访问仍然可用,但是完整性检查将被禁用。

password null
client.id 在发出请求时传递给服务器的id字符串。

这样做的目的是通过允许在服务器端请求日志记录中包含逻辑应用程序名称来跟踪请求源的ip/port。

string ""
connections.max.idle.ms 关闭闲置连接的时间。 long 300000
receive.buffer.bytes 读取数据时使用的TCP接收缓冲区(SO_RCVBUF)的大小。

如果值为-1,则将使用OS默认值。

int 65536 [-1,...]
request.timeout.ms 配置控制客户端等待请求响应的最长时间。

如果在超时之前未收到响应,客户端将在必要时重新发送请求,如果重试耗尽,则该请求将失败。

int 120000 [0,...]
sasl.jaas.config JAAS配置文件使用的格式的SASL连接的JAAS登录上下文参数。这里描述JAAS配置文件格式。该值的格式为:' (=)*;' password null
sasl.kerberos.service.name Kafka运行的Kerberos principal名。

可以在Kafka的JAAS配置或Kafka的配置中定义。

string null
sasl.mechanism 用于客户端连接的SASL机制。

安全提供者可用的任何机制。GSSAPI是默认机制。

string GSSAPI
security.protocol 与broker通讯的协议。

有效的值有: PLAINTEXT, SSL, SASL_PLAINTEXT,SASL_SSL.

string PLAINTEXT
send.buffer.bytes 发送数据时时使用TCP发送缓冲区(SO_SNDBUF)的大小。

如果值为-1,则使用OS默认值。

int 131072 [-1,...]
ssl.enabled.protocols 启用SSL连接的协议列表。 list TLSv1.2,TLSv1.1,TLSv1
ssl.keystore.type 密钥仓库文件的文件格式。

对于客户端是可选的。

string JKS
ssl.protocol 用于生成SSLContext的SSL协议。

默认设置是TLS,这对大多数情况都是适用的。 最新的JVM中允许的值是TLS,TLSv1.1和TLSv1.2。 较旧的JVM可能支持SSL,SSLv2和SSLv3,但由于已知的安全漏洞,不建议使用。

string TLS
ssl.provider 用于SSL连接的安全提供程序的名称。

默认值是JVM的默认安全提供程序。

string null
ssl.truststore.type 信任仓库文件的文件格式 string JKS
metadata.max.age.ms 我们强制更新元数据的时间段(以毫秒为单位),即使我们没有任何分区leader发生变化,主动发现任何新的broker或分区。 long 300000 [0,...]
metric.reporters 用作指标记录的类的列表。

实现MetricReporter接口,以允许插入将被通知新的度量创建的类。 JmxReporter始终包含在注册JMX统计信息中。

list ""
metrics.num.samples 用于计算度量维护的样例数。 int 2 [1,...]
metrics.recording.level The highest recording level for metrics. string INFO [INFO, DEBUG]
metrics.sample.window.ms 时间窗口计算度量标准。 long 30000 [0,...]
reconnect.backoff.max.ms 重新连接到重复无法连接的broker程序时等待的最大时间(毫秒)。

如果提供,每个主机的回退将会连续增加,直到达到最大值。 计算后退增加后,增加20%的随机抖动以避免连接风暴。

long 1000 [0,...]
reconnect.backoff.ms 尝试重新连接到给定主机之前等待的基本时间量。

这避免了在频繁的重复连接主机。此配置适用于client对broker的所有连接尝试。

long 50 [0,...]
retries 在失败之前重试调用的最大次数 int 5 [0,...]
retry.backoff.ms 尝试重试失败的请求之前等待的时间。

这样可以避免在某些故障情况下以频繁的重复发送请求。

long 100 [0,...]
sasl.kerberos.kinit.cmd Kerberos kinit命令路径。 string /usr/bin/kinit
sasl.kerberos.min.time.before.relogin 刷新尝试之间的登录线程睡眠时间。 long 60000
sasl.kerberos.ticket.renew.jitter 添加到更新时间的随机抖动百分比。 double 0.05
sasl.kerberos.ticket.renew.window.factor 登录线程将休眠,直到从上次刷新到“票”到期时间的指定窗口为止,此时将尝试续订“票”。 double 0.8
ssl.cipher.suites 密码套件列表。

是TLS或SSL网络协议来协商用于网络连接的安全设置的认证,加密,MAC和密钥交换算法的命名组合。 默认情况下,支持所有可用的密码套件。

list null
ssl.endpoint.identification.algorithm 使用服务器证书验证服务器主机名的端点识别算法。 string null
ssl.keymanager.algorithm 用于SSL连接的密钥管理工厂算法。默认值是Java虚拟机配置的密钥管理器工厂算法。 string SunX509
ssl.secure.random.implementation 用于SSL加密操作的SecureRandom PRNG实现。 string null
ssl.trustmanager.algorithm 用于SSL连接的信任管理工厂算法,默认是Java虚拟机机制。 string PKIX
kafka >= 2.0.0
sasl.client.callback.handler.class 实现AuthenticateCallbackHandler接口的SASL客户端回调处理程序类的全称。 class null
sasl.login.callback.handler.class 实现AuthenticateCallbackHandler接口的SASL登录回调处理程序类的全称。

对于broker来说,登录回调处理程序配置必须以监听器前缀和小写的SASL机制名称为前缀。 例如,listener.name.sasl_ssl.scram-sha-256.sasl.login.callback.handler.class=com.example.CustomScramLoginCallbackHandler

class null
sasl.login.class 实现Login接口的类的全称。

对于broker来说,login config必须以监听器前缀和SASL机制名称为前缀,并使用小写。 例如,listener.name.sasl_ssl.scram-sha-256.sasl.login.class=com.example.CustomScramLogin。

class null
kafka >= 2.1.0
client.dns.lookup 控制客户端如何使用DNS查询。
  • 如果设置为 use_all_dns_ips,则依次连接到每个返回的IP地址,直到成功建立连接。断开连接后,使用下一个IP。
    一旦所有的IP都被使用过一次,客户端就会再次从主机名中解析IP(s)(然而,JVM和操作系统都会缓存DNS名称查询)。
  • 如果设置为 resolve_canonical_bootstrap_servers_only,则将每个引导地址解析成一个canonical名称列表。
    在bootstrap阶段之后,这和use_all_dns_ips的行为是一样的。
  • 如果设置为 default(已弃用),则尝试连接到查找返回的第一个IP地址,即使查找返回多个IP地址。
string use_all_dns_ips [default, use_all_dns_ips, resolve_canonical_bootstrap_servers_only]
kafka >= 2.7
ssl.truststore.certificates 可信证书的格式由'ssl.truststore.type'指定。

默认的SSL引擎工厂只支持带X.509证书的PEM格式。

password null
socket.connection.setup.timeout.max.ms 客户端等待建立socket连接的最大时间。

连接设置超时时间将随着每一次连续的连接失败而成倍增加,直到这个最大值。 为了避免连接风暴,超时时间将被应用一个0.2的随机因子,导致计算值在20%以下和20%以上的随机范围。

long 127000 (127 seconds)
socket.connection.setup.timeout.ms 客户端等待建立socket连接的时间。

如果在超时之前没有建立连接,客户端将关闭socket通道。

long 10000 (10 seconds)