Kafka:安装、配置

来自Wikioe
Eijux讨论 | 贡献2021年5月19日 (三) 03:20的版本 →‎Topic
跳到导航 跳到搜索


关于

Kafka 解压即用,并没有繁琐的安装步骤,唯一注意的是其需要 Zookeeper 支持(但其自带有 Zookeeper)。


  1. 官网下载压缩包:https://www.apache.org/dyn/closer.cgi?path=/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz
  2. 提取 tar 文件:
    $ cd opt/
    $ tar -zxf kafka_2.11.0.9.0.0 tar.gz
    
  3. 启动服务器:
    $ cd kafka_2.11.0.9.0.0
    $ bin/kafka-server-start.sh config/server.properties
    
    服务器启动后,会得到以下响应:
    $ bin/kafka-server-start.sh config/server.properties
    [2016-01-02 15:37:30,410] INFO KafkaConfig values:
    request.timeout.ms = 30000
    log.roll.hours = 168
    inter.broker.protocol.version = 0.9.0.X
    log.preallocate = false
    security.inter.broker.protocol = PLAINTEXT
    …………………………………………….
    …………………………………………….
    
  4. 停止服务器:
    $ bin/kafka-server-stop.sh config/server.properties
    


NOTE:

  • 启动 kafka 需要执行 kafka-server-start.bat 文件,需要传入一个路径参数(server.config 文件的路径):
    1. 如果使用自行安装的 Zookeeper(先启动 Zookeeper,再启动 Kafka),使用“config/server.properties”。
    2. 如果使用 Kafka 自带的 Zookeeper(直接启动 Kafka 即可),使用:“config/zookeeper.properties”。
    properties 文件都位于“../kafka_x.xx.x.x.x.x/config”中。

配置

Broker

基本配置如下:

  • broker.id
  • log.dirs
  • zookeeper.connect

详细配置:

名称 描述 类型 默认 有效值 重要程度 更新模式
kafka >= 0.10版
zookeeper.connect zookeeper host string string
advertised.host.name 过时的:当advertised.listeners或listeners没设置时候才使用。
  • 请改用advertised.listeners。Hostname发布到Zookeeper供客户端使用。
  • 在IaaS环境中,Broker可能需要绑定不同的接口。
  • 如果没有设置,将会使用host.name(如果配置了)。否则将从java.net.InetAddress.getCanonicalHostName()获取。
string null
advertised.listeners 发布到Zookeeper供客户端使用监听(如果不同)。在IaaS环境中,broker可能需要绑定不同的接口。如果没设置,则使用listeners。 string null
advertised.port 过时的:当advertised.listeners或listeners没有设置才使用。
  • 请改用advertised.listeners。
  • 端口发布到Zookeeper供客户端使用,在IaaS环境中,broker可能需要绑定到不同的端口。
  • 如果没有设置,将和broker绑定的同一个端口。
int null
auto.create.topics.enable 启用自动创建topic boolean true
auto.leader.rebalance.enable 启用自动平衡leader。如果需要,后台线程会定期检查并触发leader平衡。 boolean true
background.threads 用于各种后台处理任务的线程数 int 10 [1,...]
broker.id 服务器的broker id。
  • 如果未设置,将生成一个独一无二的broker id。

要避免zookeeper生成的broker id和用户配置的broker id冲突,从reserved.broker.max.id + 1开始生成。

int -1
compression.type 为给定topic指定最终的压缩类型。支持标准的压缩编码器('gzip', 'snappy', 'lz4')。也接受'未压缩',就是没有压缩。保留由producer设置的原始的压缩编码。 string producer
delete.topic.enable 启用删除topic。如果此配置已关闭,通过管理工具删除topic将没有任何效果 boolean false
host.name 过时的:当listeners没有设置才会使用。
  • 请改用listeners。
  • 如果设置,它将只绑定到此地址。如果没有设置,它将绑定到所有接口
string ""
leader.imbalance.check.interval.seconds 由控制器触发分区再平衡检查的频率 long 300
leader.imbalance.per.broker.percentage 允许每个broker的leader比例不平衡。如果每个broker的值高于此值,控制器将触发leader平衡,该值以百分比的形式指定。 int 10
listeners 监听列表 - 监听逗号分隔的URL列表和协议。
  • 指定hostname为0.0.0.0绑定到所有接口,将hostname留空则绑定到默认接口。
  • 合法的listener列表是:
    PLAINTEXT://myhost:9092,TRACE://:9091
    PLAINTEXT://0.0.0.0:9092, TRACE://localhost:9093
string null
log.dir 保存日志数据的目录 (补充log.dirs属性) string /tmp/kafka-logs
log.dirs 保存日志数据的目录。如果未设置,则使用log.dir中的值 string null
log.flush.interval.messages 消息刷新到磁盘之前,累计在日志分区的消息数 long 9223372036854775807 [1,...]
log.flush.interval.ms topic中的消息在刷新到磁盘之前保存在内存中的最大时间(以毫秒为单位),如果未设置,则使用log.flush.scheduler.interval.ms中的值 null
log.flush.offset.checkpoint.interval.ms 我们更新的持续记录的最后一次刷新的频率。作为日志的恢复点。 int 60000 [0,...]
log.flush.scheduler.interval.ms 日志刷新的频率(以毫秒为单位)检查是否有任何日志需要刷新到磁盘 long 9223372036854775807
log.retention.bytes 删除日志之前的最大大小 long -1
log.retention.hours 删除日志文件保留的小时数(以小时为单位)。第三级是log.retention.ms属性 int 168
log.retention.minutes 删除日志文件之前保留的分钟数(以分钟为单位)。次于log.retention.ms属性。如果没设置,则使用log.retention.hours的值。 int null
log.retention.ms 删除日志文件之前保留的毫秒数(以毫秒为单位),如果未设置,则使用log.retention.minutes的值。 long null
log.roll.hours 新建一个日志段的最大时间(以小时为单位),次于log.roll.ms属性 int 168 [1,...]
log.roll.jitter.hours 从logRollTimeMillis(以小时为单位)减去最大抖动,次于log.roll.jitter.ms属性。 int 0 [0,...]
log.roll.ms 新建一个日志段之前的最大事时间(以毫秒为单位)。如果未设置,则使用log.roll.hours的值。 long null
log.segment.bytes 单个日志文件的最大大小 int 1073741824 [14,...]
log.segment.delete.delay.ms 从文件系统中删除文件之前的等待的时间 long 60000 [0,...]
message.max.bytes 服务器可以接收的消息的最大大小 int 1000012 [0,...]
min.insync.replicas 当producer设置acks为"all"(或"-1")时。min.insync.replicas指定必须应答成功写入的replicas最小数。
  • 如果不能满足最小值,那么producer抛出一个异常(NotEnoughReplicas或NotEnoughReplicasAfterAppend)。
  • 当一起使用时,min.insync.replicas和acks提供最大的耐用性保证。
    一个典型的场景是创建一个复制因子3的topic,设置min.insync.replicas为2,并且ack是“all”。
  • 如果多数副本没有接到写入时,将会抛出一个异常。
int 1 [1,...]
num.io.threads 服务器用于执行网络请求的io线程数 int 8 [1,...]
num.network.threads 服务器用于处理网络请求的线程数。 int 3 [1,...]
num.recovery.threads.per.data.dir 每个数据的目录线程数,用于启动时日志恢复和关闭时flush。 int 1 [1,...]
num.replica.fetchers 从源broker复制消息的提取线程数。递增该值可提高 follower broker的I/O的并发。 int 1
offset.metadata.max.bytes offset提交关联元数据条目的最大大小 int 4096
offsets.commit.required.acks commit之前需要的应答数,通常,不应覆盖默认的(-1) short -1
offsets.commit.timeout.ms Offset提交延迟,直到所有副本都收到提交或超时。 这类似于生产者请求超时。 int 5000 [1,...]
offsets.load.buffer.size 当加载offset到缓存时,从offset段读取的批量大小。 int 5242880 [1,...]
offsets.retention.check.interval.ms 检查过期的offset的频率。 long 600000 [1,...]
offsets.retention.minutes offset topic的日志保留时间(分钟) int 1440 [1,...]
offsets.topic.compression.codec 压缩编码器的offset topic - 压缩可以用于实现“原子”提交 int 0
offsets.topic.num.partitions offset commit topic的分区数(部署之后不应更改) int 50 [1,...]
offsets.topic.replication.factor offset topic复制因子(ps:就是备份数,设置的越高来确保可用性)。

为了确保offset topic有效的复制因子,第一次请求offset topic时,活的broker的数量必须最少最少是配置的复制因子数。 如果不是,offset topic将创建失败或获取最小的复制因子(活着的broker,复制因子的配置)

short 3 [1,...]
offsets.topic.segment.bytes offset topic段字节应该相对较小一点,以便于加快日志压缩和缓存加载 int 104857600 [1,...]
port 过时的:当listener没有设置才使用。请改用listeners。该port监听和接收连接。 int 9092
queued.max.requests 在阻塞网络线程之前允许的排队请求数 int 500 [1,...]
quota.consumer.default 过时的:当默认动态的quotas没有配置或在Zookeeper时。如果每秒获取的字节比此值高,所有消费者将通过clientId/consumer区分限流。 long 9223372036854775807 [1,...]
quota.producer.default 过时的:当默认动态的quotas没有配置,或在zookeeper时。如果生产者每秒比此值高,所有生产者将通过clientId区分限流。 long 9223372036854775807 [1,...]
replica.fetch.min.bytes Minimum 每个获取响应的字节数。如果没有满足字节数,等待replicaMaxWaitTimeMs。 int 1
replica.fetch.wait.max.ms 跟随者副本发出每个获取请求的最大等待时间,此值应始终小于replica.lag.time.max.ms,以防止低吞吐的topic的ISR频繁的收缩。 int 500
replica.high.watermark.checkpoint.interval.ms 达到高“水位”保存到磁盘的频率。 long 5000
replica.lag.time.max.ms 如果一个追随者没有发送任何获取请求或至少在这个时间的这个leader的没有消费完。该leader将从isr中移除这个追随者。 long 10000
replica.socket.receive.buffer.bytes 用于网络请求的socket接收缓存区 int 65536
replica.socket.timeout.ms 网络请求的socket超时,该值最少是replica.fetch.wait.max.ms int 30000
request.timeout.ms 该配置控制客户端等待请求的响应的最大时间。

如果超过时间还没收到消费。客户端将重新发送请求,如果重试次数耗尽,则请求失败。

int 30000
socket.receive.buffer.bytes socket服务的SO_RCVBUF缓冲区。如果是-1,则默认使用OS的。 int 102400
socket.request.max.bytes socket请求的最大字节数 int 104857600 [1,...]
socket.send.buffer.bytes socket服务的SO_SNDBUF缓冲区。如果是-1,则默认使用OS的。 int 102400
unclean.leader.election.enable 是否启用不在ISR中的副本参与选举leader的最后的手段。这样做有可能丢失数据。 boolean true
zookeeper.connection.timeout.ms 连接zookeeper的最大等待时间,如果未设置,则使用zookeeper.session.timeout.ms。 int null
zookeeper.session.timeout.ms Zookeeper会话的超时时间 int 6000
zookeeper.set.acl 设置客户端使用安全的ACL boolean false
broker.id.generation.enable 启用自动生成broker id。启用该配置时应检查reserved.broker.max.id。 boolean true
broker.rack broker机架,用于机架感知副本分配的失败容错。例如:RACK1, us-east-1d string null
connections.max.idle.ms Idle 连接超时:闲置时间超过该设置,则服务器socket处理线程关闭这个连接。 long 600000
controlled.shutdown.enable 启用服务器的关闭控制。 boolean true
controlled.shutdown.max.retries 控制因多种原因导致的shutdown失败,当这样失败发生,尝试重试的次数 int 3
controlled.shutdown.retry.backoff.ms 在每次重试之前,系统需要时间从导致先前故障的状态(控制器故障转移,复制延迟等)恢复。 此配置是重试之前等待的时间数。 long 5000
controller.socket.timeout.ms 控制器到broker通道的sockt超时时间 int 30000
default.replication.factor 自动创建topic的默认的副本数 int 1
fetch.purgatory.purge.interval.requests 拉取请求清洗间隔(请求数) int 1000
group.max.session.timeout.ms 已注册的消费者允许的最大会话超时时间,设置的时候越长使消费者有更多时间去处理心跳之间的消息。但察觉故障的时间也拉长了。 int 300000
group.min.session.timeout.ms 已经注册的消费者允许最小的会话超时时间,更短的时间去快速的察觉到故障,代价是频繁的心跳,这可能会占用大量的broker资源。 int 6000
inter.broker.protocol.version 指定broker内部通讯使用的版本。通常在更新broker时使用。有效的值为:0.8.0, 0.8.1, 0.8.1.1, 0.8.2, 0.8.2.0, 0.8.2.1, 0.9.0.0, 0.9.0.1。查看ApiVersion找到的全部列表。 string 0.10.1-IV2
log.cleaner.backoff.ms 当没有日志要清理时,休眠的时间 long 15000 [0,...]
log.cleaner.dedupe.buffer.size 用于日志去重的内存总量(所有cleaner线程) long 134217728
log.cleaner.delete.retention.ms 删除记录保留多长时间? long 86400000
log.cleaner.enable 在服务器上启用日志清洗处理?如果使用的任何topic的cleanup.policy=compact包含内部的offset topic,应启动。如果禁用,那些topic将不会被压缩并且会不断的增大。 boolean true
log.cleaner.io.buffer.load.factor 日志cleaner去重缓冲负载因子。去重缓冲区的百分比,较高的值将允许同时清除更多的日志,但将会导致更多的hash冲突。 double 0.9
log.cleaner.io.buffer.size 所有日志清洁器线程I/O缓存的总内存 int 524288 [0,...]
log.cleaner.io.max.bytes.per.second 日志清理器限制,以便其读写i/o平均小与此值。 double 1.7976931348623157E308
log.cleaner.min.cleanable.ratio 脏日志与日志的总量的最小比率,以符合清理条件 double 0.5
log.cleaner.min.compaction.lag.ms 一条消息在日志保留不压缩的最小时间,仅适用于正在压缩的日志。 long 0
log.cleaner.threads 用于日志清除的后台线程数 int 1 [0,...]
log.cleanup.policy 超过保留时间段的默认清除策略。逗号分隔的有效的策略列表。有效的策略有:“delete”和“compact” list [delete] [compact, delete]
log.index.interval.bytes 添加一个条目到offset的间隔 int 4096(4 kibibytes) [0,...]
log.index.size.max.bytes offset index的最大大小(字节) int 10485760 [4,...]
log.message.format.version 指定追加到日志中的消息格式版本。例如: 0.8.2, 0.9.0.0, 0.10.0。

通过设置一个特定消息格式版本,用户需要保证磁盘上所有现有的消息小于或等于指定的版本。错误的设置将导致旧版本的消费者中断,因为消费者接收一个不理解的消息格式。

string 0.10.1-IV2
log.message.timestamp.difference.max.ms 如果log.message.timestamp.type=CreateTime,broker接收消息时的时间戳和消息中指定的时间戳之间允许的最大差异。
  • 如果时间戳超过此阈值,则消息将被拒绝。如果log.message.timestamp.type=LogAppendTime,则此配置忽略。
long 9223372036854775807 [0,...]
log.message.timestamp.type 定义消息中的时间戳是消息创建时间或日志追加时间。该值可设置为CreateTime 或 LogAppendTime string CreateTime [CreateTime, LogAppendTime]
log.preallocate 在创建新段时预分配文件?如果你在Windowns上使用kafka,你可能需要设置它为true。 boolean false
log.retention.check.interval.ms 日志清除程序检查日志是否满足被删除的频率(以毫秒为单位) long 300000 [1,...]
max.connections.per.ip 允许每个ip地址的最大连接数。 int 2147483647 [1,...]
max.connections.per.ip.overrides per-ip或hostname覆盖默认最大连接数 string ""
num.partitions topic的默认分区数 int 1 [1,...]
principal.builder.class 实现PrincipalBuilder接口类的完全限定名,该接口目前用于构建与SSL SecurityProtocol连接的Principal。 class class org。apache。kafka。common。security。auth。DefaultPrincipalBuilder(wiki傻逼编辑器:。换位.)  
producer.purgatory.purge.interval.requests 生产者请求purgatory的清洗间隔(请求数) int 1000
replica.fetch.backoff.ms 当拉取分区发生错误时休眠的时间 int 1000 [0,...]
replica.fetch.max.bytes 拉取每个分区的消息的字节数。这不是绝对的最大值,如果提取的第一个非空分区中的第一个消息大于这个值,则消息仍然返回,以确保进展。通过message.max.bytes (broker配置)或max.message.bytes (topic配置)定义broker接收的最大消息大小。 int 1048576 [0,...]
replica.fetch.response.max.bytes 预计整个获取响应的最大字节数,这不是绝对的最大值,如果提取的第一个非空分区中的第一个消息大于这个值,则消息仍然返回,以确保进展。

通过message.max.bytes (broker配置)或max.message.bytes (topic配置)定义broker接收的最大消息大小。

int 10485760 [0,...]
reserved.broker.max.id broker.id的最大数 int 1000 [0,...]
sasl.enabled.mechanisms 可用的SASL机制列表,包含任何可用的安全提供程序的机制。默认情况下只有GSSAPI是启用的。 list [GSSAPI]
sasl.kerberos.kinit.cmd Kerberos kinit 命令路径。 string /usr/bin/kinit
sasl.kerberos.min.time.before.relogin 登录线程在刷新尝试的休眠时间。 long 60000
sasl.kerberos.principal.to.local.rules principal名称映射到一个短名称(通常是操作系统用户名)。按顺序,使用与principal名称匹配的第一个规则将其映射其到短名称。忽略后面的规则。

默认情况下,{username}/{hostname}@{REALM} 映射到 {username}。

list [DEFAULT]
sasl.kerberos.service.name Kafka运行的Kerberos principal名称。可以在JAAS或Kafka的配置文件中定义。 string null
sasl.kerberos.ticket.renew.jitter 添加到更新时间的随机抖动的百分比 time. double 0.05
sasl.kerberos.ticket.renew.window.factor 登录线程休眠,直到从上次刷新到ticket的到期的时间已到达(指定窗口因子),在此期间它将尝试更新ticket。 double 0.8
sasl.mechanism.inter.broker.protocol SASL机制,用于broker之间的通讯,默认是GSSAPI。 string GSSAPI
security.inter.broker.protocolSecurity broker之间的通讯协议,有效值有:PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL。 string PLAINTEXT
ssl.cipher.suites 密码套件列表。认证,加密,MAC和秘钥交换算法的组合,用于使用TLS或SSL的网络协议交涉网络连接的安全设置,默认情况下,支持所有可用的密码套件。 list null
ssl.client.auth 配置请求客户端的broker认证。

常见的设置:

  • “ssl.client.auth=required”需要客户端认证。
  • “ssl.client.auth=requested”客户端认证可选,不同于requested,客户端可选择不提供自身的身份验证信息。
  • “ssl.client.auth=none”不需要客户端身份认证
string none [required, requested, none]
ssl.enabled.protocols 已启用的SSL连接协议列表。 list [TLSv1.2, TLSv1.1, TLSv1]
ssl.key.password 秘钥库文件中的私钥密码。对客户端是可选的。 password null
ssl.keymanager.algorithm 用于SSL连接的密钥管理工厂算法。默认值是Java虚拟机的密钥管理工厂算法。 string SunX509
ssl.keystore.location 密钥仓库文件的位置。客户端可选,并可用于客户端的双向认证。 string null
ssl.keystore.password 密钥仓库文件的仓库密码。客户端可选,只有ssl.keystore.location配置了才需要。 password null
ssl.keystore.type 密钥仓库文件的格式。客户端可选。 string JKS
ssl.protocol 用于生成SSLContext,默认是TLS,适用于大多数情况。允许使用最新的JVM,LS, TLSv1.1 和TLSv1.2。 SSL,SSLv2和SSLv3 老的JVM也可能支持,由于有已知的安全漏洞,不建议使用。 string TLS
ssl.provider 用于SSL连接的安全提供程序的名称。默认值是JVM的安全程序。 string null
ssl.trustmanager.algorithm 信任管理工厂用于SSL连接的算法。默认为Java虚拟机配置的信任算法。 string PKIX
ssl.truststore.location 信任仓库文件的位置 string null
ssl.truststore.password 信任仓库文件的密码 password null
ssl.truststore.type 信任仓库文件的文件格式 string JKS
authorizer.class.name 用于认证的授权程序类 string ""
metric.reporters 度量报告的类列表,通过实现MetricReporter接口,允许插入新度量标准类。JmxReporter包含注册JVM统计。 list []
metrics.num.samples 维持计算度量的样本数。 int 2 [1,...]
metrics.sample.window.ms 计算度量样本的时间窗口 long 30000 [1,...]
quota.window.num 在内存中保留客户端限额的样本数 int 11 [1,...]
quota.window.size.seconds 每个客户端限额的样本时间跨度 int 1 [1,...]
replication.quota.window.num 在内存中保留副本限额的样本数 int 11 [1,...]
replication.quota.window.size.seconds 每个副本限额样本数的时间跨度 int 1 [1,...]
ssl.endpoint.identification.algorithm 端点身份标识算法,使用服务器证书验证服务器主机名。 string null
ssl.secure.random.implementation 用于SSL加密操作的SecureRandom PRNG实现。 string null
zookeeper.sync.time.ms ZK follower可落后与leader多久。 int 2000
以下是kafka新版本的增量配置
kafka >= 1.0
group.initial.rebalance.delay.ms 分组协调器在执行第一次重新平衡之前,等待更多消费者加入新组的时间。延迟时间越长,意味着重新平衡的次数可能越少,但会增加处理开始前的时间。 int 3000 只读
transaction.abort.timed.out.transaction.cleanup.interval.ms 回滚已超时的事务的时间间隔。 int 10000 (10 seconds) [1,...] 只读
transaction.remove.expired.transaction.cleanup.interval.ms 删除因transactional.id.expiration.ms过期的事务的时间间隔。 int 3600000 (1 hour) [1,...] 只读
transaction.max.timeout.ms 事务的最大允许超时时间。如果客户端请求的交易时间超过了这个时间,那么broker将在InitProducerIdRequest中返回一个错误。

这可以防止客户端的超时时间过大,从而阻滞消费者从事务中包含的主题中读取。

int 900000 (15 minutes) [1,...] 只读
transaction.state.log.load.buffer.size 在将生产者id和事务加载到缓存中时,从事务日志段读取的批次大小(软限制,如果消息太大,则重写) int 5242880 [1,...] 只读
transaction.state.log.min.isr 覆盖事务topic的min.insync.replicas配置。 int 2 [1,...] 只读
transaction.state.log.num.partitions 事务topic的分区数(部署后不应改变)。 int 50 [1,...] 只读
transaction.state.log.replication.factor 事务topic的复制因子(设置较高来确保可用性)。内部topic创建将失败,直到集群规模满足该复制因子要求。 short 3 [1,...] 只读
transaction.state.log.segment.bytes 事务topic段的字节数应保持相对较小,以利于加快日志压缩和缓存加载速度 int 104857600 (100 mebibytes) [1,...] 只读
transactional.id.expiration.ms 事务协调器在没有收到当前事务的任何事务状态更新的情况下,在其事务id过期前等待的时间,单位为ms。

这个设置也会影响生产者id过期:一旦这个时间在给定的生产者id最后一次写入后过去,生产者id就会过期。

  • 请注意,如果由于主题的保留设置而删除了生产者id的最后一次写入,那么生产者id可能会更快过期。
int 604800000 (7 days) [1,...] 只读
kafka >= 2.0
sasl.client.callback.handler.class 实现AuthenticateCallbackHandler接口的SASL客户端回调处理程序类的全称。 class null 中间 只读
sasl.login.callback.handler.class 实现AuthenticateCallbackHandler接口的SASL登录回调处理程序类的全称。对于broker来说,登录回调处理程序配置必须以监听器前缀和小写的SASL机制名称为前缀。

例如,listener.name.sasl_ssl.scram-sha-256.sasl.login.callback.handler.class=com.example.CustomScramLoginCallbackHandler

class null 中间 只读
sasl.login.class 实现Login接口的类的全称。对于broker来说,login config必须以监听器前缀和SASL机制名称为前缀,并使用小写。

例如,listener.name.sasl_ssl.scram-sha-256.sasl.login.class=com.example.CustomScramLogin。

class null 中间 只读
kafka >= 2.5
zookeeper.clientCnxnSocket 当使用TLS连接到ZooKeeper时,通常设置为org.apache.zookeeper.ClientCnxnSocketNetty。

覆盖任何同名的zookeeper.clientCnxnSocket设置的显式值。

string null 中间 只读
zookeeper.ssl.client.enable 设置客户端连接到ZooKeeper时使用TLS。

显式的值会覆盖任何通过zookeeper.client.secure设置的值(注意名称不同)。 如果两者都没有设置,默认为false; 当为true时,必须设置zookeeper.clientCnxnSocket(通常为org.apache.zookeeper.ClientCnxnSocketNetty); 其他需要设置的值可能包括zookeeper.ssl.cipher.suites、zookeeper.ssl.crl.enable、zookeeper.ssl.enabled.protocols、zookeeper.ssl.endpoint. identification.algorithm,zookeeper.ssl.keystore.location,zookeeper.ssl.keystore.password,zookeeper.ssl.keystore.type,zookeeper.ssl. ocsp.enable, zookeeper.ssl.protocol, zookeeper.ssl.truststore.location, zookeeper.ssl.truststore.password, zookeeper.ssl.truststore.type。

boolean false 中间 只读
zookeeper.ssl.keystore.location 当使用客户端证书与TLS连接到ZooKeeper时的keystore位置。

覆盖任何通过zookeeper.ssl.keyStore.location系统属性设置的显式值(注意是驼峰大小)。

password null 中间 只读
zookeeper.ssl.keystore.password 当使用客户端证书与TLS连接到ZooKeeper时的keystore密码。覆盖任何通过`zookeeper.ssl.keyStore.password系统属性设置的显式值(注意驼峰大写)。 注意,ZooKeeper不支持与keystore密码不同的密钥密码,所以一定要将keystore中的密钥密码设置为与keystore密码相同,否则连接Zookeeper的尝试将失败。 password null 中间 只读
zookeeper.ssl.keystore.type 当使用客户端证书与TLS连接到ZooKeeper时的keystore类型。覆盖任何通过zookeeper.ssl.keyStore.type系统属性设置的显式值(注意骆驼大写)。

默认值为null意味着该类型将根据keystore的文件扩展名自动检测。

string null 中间 只读
zookeeper.ssl.protocol 指定ZooKeeper TLS协商中使用的协议。

一个显式的值会覆盖任何通过同名的zookeeper.ssl.protocol系统设置的值。

string TLSv1.2 只读
zookeeper.ssl.cipher.suites 指定在ZooKeeper TLS协商中使用的密码套件(csv),覆盖任何通过zookeeper.ssl.ciphersuites系统属性设置的显式值(注意单字 "ciphersuites")。

覆盖任何通过zookeeper.ssl.ciphersuites系统属性设置的显式值(注意 "ciphersuites "这个单字)。 默认值为 "null "意味着启用的密码套件列表是由正在使用的Java运行时决定的。

boolean false 只读
zookeeper.ssl.crl.enable 指定是否启用ZooKeeper TLS协议中的证书撤销列表。

覆盖任何通过zookeeper.ssl.crl系统属性设置的显式值(注意是短名)。

boolean false 只读
zookeeper.ssl.enabled.protocols 指定ZooKeeper TLS协商(csv)中启用的协议。

覆盖任何通过zookeeper.ssl.enabledProtocols系统属性设置的显式值(注意骆驼大写)。 默认值为 "null "意味着启用的协议将是zookeeper.ssl.protocol配置属性的值。

list null 只读
zookeeper.ssl.endpoint.identification.algorithm 指定是否在ZooKeeper TLS协商过程中启用主机名验证,(不区分大小写)"https "表示启用ZooKeeper主机名验证,显式的空白值表示禁用(仅为测试目的建议禁用)。

明确的值会覆盖任何通过zookeeper.ssl.hostnameVerification系统属性设置的 "true "或 "false "值(注意不同的名称和值;true意味着https,false意味着空白)。

string HTTPS 只读
zookeeper.ssl.ocsp.enable 指定是否启用ZooKeeper TLS协议中的在线证书状态协议。

覆盖任何通过zookeeper.ssl.ocsp系统属性设置的显式值(注意是短名)。

boolean false 只读
kafka >= 2.7
ssl.truststore.certificates 可信证书的格式由'ssl.truststore.type'指定。

默认的SSL引擎工厂只支持带X.509证书的PEM格式。

password null 中间 每个broker
socket.connection.setup.timeout.max.ms 客户端等待建立socket连接的最大时间。

连接设置超时时间将随着每一次连续的连接失败而成倍增加,直到这个最大值。 为了避免连接风暴,超时时间将被应用一个0.2的随机因子,导致计算值在20%以下和20%以上的随机范围。

long 127000 (127 seconds) 中间 只读
socket.connection.setup.timeout.ms 客户端等待建立socket连接的时间。

如果在超时之前没有建立连接,客户端将关闭socket通道。

long 10000 (10 seconds) 中间 只读

Topic

与topic相关的配置,服务器的默认值,也可选择的覆盖指定的topic。

  • 如果没有给出指定topic的配置,则将使用服务器默认值。
  • 可以通过“-config”选项在topic创建时设置。


此示例使用自定义最大消息的大小和刷新率,创建一个名为 my-topic 的topic:

> bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic my-topic --partitions 1
--replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1

也可以使用“alter configs”命令修改或设置。 此示例修改更新 my-topic 的最大的消息大小:

> bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic
    --alter --add-config max.message.bytes=128000

可以执行以下命令验证结果:

> bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic --describe

移除设置:

> bin/kafka-configs.sh --zookeeper localhost:2181  --entity-type topics --entity-name my-topic --alter --delete-config max.message.bytes


详细配置:

名称 描述 类型 默认 有效值 服务器默认属性 更新模式
cleanup.policy “delete”或“compact”。指定在旧的日志段的保留策略。

默认策略(“delete”),将达到保留时间或大小限制的日志废弃。 “compact”则压缩日志。

list delete [compact, delete] log.cleanup.policy
compression.type 针对指定的topic设置最终的压缩方式。

标准的压缩格式有'gzip', 'snappy', lz4。还可以设置'uncompressed',就是不压缩;设置为'producer'这意味着保留生产者设置的原始压缩编解码。

string producer [uncompressed, snappy, lz4, gzip, producer] compression.type
delete.retention.ms 保留删除消息压缩topic的删除标记的时间。此设置还给出消费者如果从offset 0开始读取并确保获得最终阶段的有效快照的时间范围(否则,在完成扫描之前可能已经回收了)。 long 86400000 [0,...] log.cleaner.delete.retention.ms
file.delete.delay.ms 从文件系统中删除文件之前等待的时间 long 60000 [0,...] log.segment.delete.delay.ms
flush.messages 此设置允许指定我们强制fsync写入日志的数据的间隔。

例如,如果这被设置为1,我们将在每个消息之后fsync; 如果是5,我们将在每五个消息之后fsync。

  • 一般,我们建议不要设置它,使用复制特性来保持持久性,并允许操作系统的后台刷新功能更高效。可以在每个topic的基础上覆盖此设置(请参阅每个主题的配置部分)。
long 9223372036854775807 [0,...] log.flush.interval.messages
flush.ms 此设置允许我们强制fsync写入日志的数据的时间间隔。

例如,如果这设置为1000,那么在1000ms过去之后,我们将fsync。

  • 一般,我们建议不要设置它,并使用复制来保持持久性,并允许操作系统的后台刷新功能,因为它更有效率
long 9223372036854775807 [0,...] log.flush.interval.ms
follower.replication.throttled.replicas follower复制限流列表。

该列表应以[PartitionId]的形式描述一组副本:[BrokerId],[PartitionId]:[BrokerId]:...或者通配符'*'可用于限制此topic的所有副本。

list "" [partitionId],[brokerId]:[partitionId],[brokerId]:... follower.replication.throttled.replicas
index.interval.bytes 此设置控制Kafka向其offset索引添加索引条目的频率。默认设置确保我们大致每4096个字节索引消息。

更多的索引允许读取更接近日志中的确切位置,但使索引更大。你不需要改变这个值。

int 4096 [0,...] log.index.interval.bytes
leader.replication.throttled.replicas 在leader方面进行限制的副本列表。

该列表设置以[PartitionId]的形式描述限制副本:[PartitionId]:[BrokerId],[PartitionId]:[BrokerId]:...或使用通配符‘*’限制该topic的所有副本。

list "" [partitionId],[brokerId]:[partitionId],[brokerId]:... leader.replication.throttled.replicas
max.message.bytes kafka允许的最大的消息批次大小。

如果增加此值,并且消费者的版本比0.10.2老,那么消费者的提取的大小也必须增加,以便他们可以获取大的消息批次。 在最新的消息格式版本中,消息总是分组批量来提高效率。 在之前的消息格式版本中,未压缩的记录不会分组批量,并且此限制仅适用于该情况下的单个消息。

int 1000012 [0,...] message.max.bytes
message.format.version 指定消息附加到日志的消息格式版本。

该值应该是一个有效的ApiVersion。例如:0.8.2, 0.9.0.0, 0.10.0,更多细节检查ApiVersion。 通过设置特定的消息格式版本,并且磁盘上的所有现有消息都小于或等于指定版本。 不正确地设置此值将导致消费者使用旧版本,因为他们将接收到“不认识”的格式的消息。

string 0.11.0-IV2 log.message.format.version
min.cleanable.dirty.ratio 此配置控制日志压缩程序将尝试清除日志的频率(假设启用了日志压缩)。

默认情况下,我们将避免清理超过50%日志被压缩的日志。 该比率限制日志中浪费的最大空间重复(在最多50%的日志中可以是重复的50%)。 更高的比率意味着更少,更有效的清洁,但意味着日志中的浪费更多。

double 0.5 [0,...,1] log.cleaner.min.cleanable.ratio
min.compaction.lag.ms 消息在日志中保持不压缩的最短时间。

仅适用于正在压缩的日志。

long 0 [0,...] log.cleaner.min.compaction.lag.ms
min.insync.replicas 当生产者设置应答为"all"(或“-1”)时,此配置指定了成功写入的副本应答的最小数。

如果没满足此最小数,则生产者将引发异常(NotEnoughReplicas或NotEnoughReplicasAfterAppend) 当min.insync.replicas和acks强制更大的耐用性时。 典型的情况是创建一个副本为3的topic,将min.insync.replicas设置为2,并设置acks为“all”。 如果多数副本没有收到写入,这将确保生产者引发异常。

int 1 [1,...] min.insync.replicas
preallocate 如果我们在创建新的日志段时在磁盘上预分配该文件,那么设为True。 boolean false log.preallocate
retention.bytes 如果我们使用“删除”保留策略,则此配置将控制日志可以增长的最大大小,之后我们将丢弃旧的日志段以释放空间。

默认情况下,没有设置大小限制则仅限于时间限制。

long -1 log.retention.bytes
retention.ms 如果我们使用“删除”保留策略,则此配置控制我们将保留日志的最长时间,然后我们将丢弃旧的日志段以释放空间。

这代表SLA消费者必须读取数据的时间长度。

long 604800000 log.retention.ms
segment.bytes 此配置控制日志的段文件大小。一次保留和清理一个文件,因此较大的段大小意味着较少的文件,但对保留率的粒度控制较少。 int 1073741824 [14,...] log.segment.bytes
segment.index.bytes 此配置控制offset映射到文件位置的索引的大小。我们预先分配此索引文件,并在日志滚动后收缩它。通常不需要更改此设置。 int 10485760 [0,...] log.index.size.max.bytes
segment.jitter.ms 从计划的段滚动时间减去最大随机抖动,以避免异常的段滚动 long 0 [0,...] log.roll.jitter.ms
segment.ms 此配置控制Kafka强制日志滚动的时间段,以确保保留可以删除或压缩旧数据,即使段文件未满。 long 604800000 [0,...] log.roll.ms
unclean.leader.election.enable 是否将不在ISR中的副本作为最后的手段选举为leader,即使这样做可能会导致数据丢失。 boolean false unclean.leader.election.enable
kafka > 2.0
message.downconversion.enable 此配置控制是否启用消息格式的向下转换以满足消费请求。

当设置为false时,broker不会对期待旧消息格式的消费者执行向下转换。broker 会对来自此类旧客户端的消费请求作出 UNSUPPORTED_VERSION 错误响应。

  • 这个配置不适用于复制到followers时可能需要的任何消息格式转换。
boolean false log.message.downconversion.enable

Producer

详细配置:

名称 描述 类型 默认 有效值 重要程度
kafka >= 0.10版
bootstrap.servers host/port列表,用于初始化建立和Kafka集群的连接。

列表格式为host1:port1,host2:port2,....,无需添加所有的集群地址,kafka会根据提供的地址发现其他的地址(你可以多提供几个,以防提供的服务器关闭)

list
key.serializer 实现 org.apache.kafka.common.serialization.Serializer 接口的 key 的 Serializer 类。 class
value.serializer 实现 org.apache.kafka.common.serialization.Serializer 接口的value 的 Serializer 类。 class
acks 生产者需要leader确认请求完成之前接收的应答数。

此配置控制了发送消息的耐用性,支持以下配置:

  1. acks=0
    如果设置为0,那么生产者将不等待任何消息确认。消息将立刻添加到socket缓冲区并考虑发送。
    在这种情况下不能保障消息被服务器接收到。并且重试机制不会生效(因为客户端不知道故障了没有)。每个消息返回的offset始终设置为-1。
  2. acks=1,
    这意味着leader写入消息到本地日志就立即响应,而不等待所有follower应答。
    在这种情况下,如果响应消息之后但follower还未复制之前leader立即故障,那么消息将会丢失。
  3. acks=all
    这意味着leader将等待所有副本同步后应答消息。此配置保障消息不会丢失(只要至少有一个同步的副本或者)。
    这是最强壮的可用性保障。等价于acks=-1。
string 1 [all, -1, 0, 1]
buffer.memory 生产者用来缓存等待发送到服务器的消息的内存总字节数。如果消息发送比可传递到服务器的快,生产者将阻塞max.block.ms之后,抛出异常。

此设置应该大致的对应生产者将要使用的总内存,但不是硬约束,因为生产者所使用的所有内存都用于缓冲。一些额外的内存将用于压缩(如果启动压缩),以及用于保持发送中的请求。

long 33554432 [0,...]
compression.type 数据压缩的类型。默认为空(就是不压缩)。有效的值有 none,gzip,snappy, 或 lz4。

压缩全部的数据批,因此批的效果也将影响压缩的比率(更多的批次意味着更好的压缩)。

string none
retries 设置一个比零大的值,客户端如果发送失败则会重新发送。

注意,这个重试功能和客户端在接到错误之后重新发送没什么不同。 如果max.in.flight.requests.per.connection没有设置为1,有可能改变消息发送的顺序,因为如果2个批次发送到一个分区中,并第一个失败了并重试,但是第二个成功了,那么第二个批次将超过第一个。

int 0 [0,...,2147483647]
ssl.key.password 密钥仓库文件中的私钥的密码。 password null
ssl.keystore.location 密钥仓库文件的位置。可用于客户端的双向认证。 string null
ssl.keystore.password 密钥仓库文件的仓库密码。

只有配置了ssl.keystore.location时才需要。

password null
ssl.truststore.location 信任仓库的位置 string null
ssl.truststore.password 信任仓库文件的密码 password null
batch.size 当多个消息要发送到相同分区的时,生产者尝试将消息批量打包在一起,以减少请求交互。

这样有助于客户端和服务端的性能提升。该配置的默认批次大小(以字节为单位): 不会打包大于此配置大小的消息。 发送到broker的请求将包含多个批次,每个分区一个,用于发送数据。 较小的批次大小有可能降低吞吐量(批次大小为0则完全禁用批处理)。 一个非常大的批次大小可能更浪费内存。因为我们会预先分配这个资源。

int 16384 [0,...]
client.id 当发出请求时传递给服务器的id字符串。这样做的目的是允许服务器请求记录记录这个【逻辑应用名】,这样能够追踪请求的源,而不仅仅只是ip/prot。 string ""
connections.max.idle.ms 多少毫秒之后关闭闲置的连接。 long 540000
linger.ms 生产者组将发送的消息组合成单个批量请求。

正常情况下,只有消息到达的速度比发送速度快的情况下才会出现。但是,在某些情况下,即使在适度的负载下,客户端也可能希望减少请求数量。此设置通过添加少量人为延迟来实现。- 也就是说,不是立即发出一个消息,生产者将等待一个给定的延迟,以便和其他的消息可以组合成一个批次。这类似于Nagle在TCP中的算法。此设置给出批量延迟的上限:一旦我们达到分区的batch.size值的记录,将立即发送,不管这个设置如何,但是,如果比这个小,我们将在指定的“linger”时间内等待更多的消息加入。此设置默认为0(即无延迟)。假设,设置 linger.ms=5,将达到减少发送的请求数量的效果,但对于在没有负载情况,将增加5ms的延迟。

long 0 [0,...]
max.block.ms 该配置控制 KafkaProducer.send() 和 KafkaProducer.partitionsFor() 将阻塞多长时间。

此外这些方法被阻止,也可能是因为缓冲区已满或元数据不可用。在用户提供的序列化程序或分区器中的锁定不会计入此超时。

long 60000 [0,...]
max.request.size 请求的最大大小(以字节为单位)。

此设置将限制生产者的单个请求中发送的消息批次数,以避免发送过大的请求。这也是最大消息批量大小的上限。 请注意,服务器拥有自己的批量大小,可能与此不同。

int 1048576 [0,...]
partitioner.class 实现Partitioner接口的的Partitioner类。 class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes 读取数据时使用的TCP接收缓冲区(SO_RCVBUF)的大小。

如果值为-1,则将使用OS默认值。

int 32768 [-1,...]
request.timeout.ms 该配置控制客户端等待请求响应的最长时间。

如果在超时之前未收到响应,客户端将在必要时重新发送请求,如果重试耗尽,则该请求将失败。 这应该大于replica.lag.time.max.ms,以减少由于不必要的生产者重试引起的消息重复的可能性。

int 30000 [0,...]
sasl.jaas.config JAAS配置文件使用的格式的SASL连接的JAAS登录上下文参数。这里描述JAAS配置文件格式。该值的格式为:'(=)*;' password null
sasl.kerberos.service.name Kafka运行的Kerberos主体名称。

可以在Kafka的JAAS配置或Kafka的配置中定义。

string null
sasl.mechanism SASL机制用于客户端连接。这是安全提供者可用与任何机制。

GSSAPI是默认机制。

string GSSAPI
security.protocol 用于与broker通讯的协议。

有效值为:PLAINTEXT,SSL,SASL_PLAINTEXT,SASL_SSL。

string PLAINTEXT
send.buffer.bytes 发送数据时,用于TCP发送缓存(SO_SNDBUF)的大小。

如果值为 -1,将默认使用系统的。

int 131072 [-1,...]
ssl.enabled.protocols 启用SSL连接的协议列表。 list TLSv1.2,TLSv1.1,TLSv1
ssl.keystore.type 密钥存储文件的文件格式。

对于客户端是可选的。

string JKS
ssl.protocol 最近的JVM中允许的值是TLS,TLSv1.1和TLSv1.2。

较旧的JVM可能支持SSL,SSLv2和SSLv3,但由于已知的安全漏洞,不建议使用SSL。

string TLS
ssl.provider 用于SSL连接的安全提供程序的名称。默认值是JVM的默认安全提供程序。 string null
ssl.truststore.type 信任仓库文件的文件格式。 string JKS
enable.idempotence 当设置为‘true’,生产者将确保每个消息正好一次复制写入到stream。如果‘false’,由于broker故障,生产者重试。

即,可以在流中写入重试的消息。此设置默认是‘false’。请注意,启用幂等式需要将max.in.flight.requests.per.connection设置为1,重试次数不能为零。 另外acks必须设置为“全部”。如果这些值保持默认值,我们将覆盖默认值。 如果这些值设置为与幂等生成器不兼容的值,则将抛出一个ConfigException异常。 如果这些值设置为与幂等生成器不兼容的值,则将抛出一个ConfigException异常。

boolean false
interceptor.classes 实现ProducerInterceptor接口,你可以在生产者发布到Kafka群集之前拦截(也可变更)生产者收到的消息。

默认情况下没有拦截器。

list null
max.in.flight.requests.per.connection 阻塞之前,客户端单个连接上发送的未应答请求的最大数量。

注意,如果此设置设置大于1且发送失败,则会由于重试(如果启用了重试)会导致消息重新排序的风险。

int 5 [1,...]
metadata.max.age.ms 在一段时间段之后(以毫秒为单位),强制更新元数据,即使我们没有看到任何分区leader的变化,也会主动去发现新的broker或分区。 long 300000 [0,...]
metric.reporters 用作metrics reporters(指标记录员)的类的列表。

实现MetricReporter接口,将受到新增加的度量标准创建类插入的通知。 JmxReporter始终包含在注册JMX统计信息中。

list ""
metrics.num.samples 维护用于计算度量的样例数量。 int 2 [1,...]
metrics.recording.level 指标的最高记录级别。 string INFO [INFO, DEBUG]
metrics.sample.window.ms 度量样例计算上 long 30000 [0,...]
reconnect.backoff.max.ms 重新连接到重复无法连接的代理程序时等待的最大时间(毫秒)。

如果提供,每个主机的回退将会连续增加,直到达到最大值。 计算后退增加后,增加20%的随机抖动以避免连接风暴。

long 1000 [0,...]
reconnect.backoff.ms 尝试重新连接到给定主机之前等待的基本时间量。这避免了在循环中高频率的重复连接到主机。这种回退适应于客户端对broker的所有连接尝试。 long 50 [0,...]
retry.backoff.ms 尝试重试指定topic分区的失败请求之前等待的时间。

这样可以避免在某些故障情况下高频次的重复发送请求。

long 100 [0,...]
sasl.kerberos.kinit.cmd Kerberos kinit 命令路径。 string /usr/bin/kinit
sasl.kerberos.min.time.before.relogin Login线程刷新尝试之间的休眠时间。 long 60000
sasl.kerberos.ticket.renew.jitter 添加更新时间的随机抖动百分比。 double 0.05
sasl.kerberos.ticket.renew.window.factor 登录线程将睡眠,直到从上次刷新ticket到期时间的指定窗口因子为止,此时将尝试续订ticket。 double 0.8
ssl.cipher.suites 密码套件列表。这是使用TLS或SSL网络协议来协商用于网络连接的安全设置的认证,加密,MAC和密钥交换算法的命名组合。默认情况下,支持所有可用的密码套件。 list null
ssl.endpoint.identification.algorithm 使用服务器证书验证服务器主机名的端点识别算法。 string null
ssl.keymanager.algorithm 用于SSL连接的密钥管理因子算法。默认值是为Java虚拟机配置的密钥管理器工厂算法。 string SunX509
ssl.secure.random.implementation 用于SSL加密操作的SecureRandom PRNG实现。 string null
ssl.trustmanager.algorithm 用于SSL连接的信任管理因子算法。

默认值是JAVA虚拟机配置的信任管理工厂算法。

string PKIX
transaction.timeout.ms 生产者在主动中止正在进行的交易之前,交易协调器等待事务状态更新的最大时间(以ms为单位)。

如果此值大于broker中的max.transaction.timeout.ms设置,则请求将失败,并报“InvalidTransactionTimeout”错误。

int 60000
transactional.id 用于事务传递的TransactionalId。

这样可以跨多个生产者会话的可靠性语义,因为它允许客户端保证在开始任何新事务之前使用相同的TransactionalId的事务已经完成。 如果没有提供TransactionalId,则生产者被限制为幂等传递。 请注意,如果配置了TransactionalId,则必须启用enable.idempotence。 默认值为空,这意味着无法使用事务。

string null non-empty string
kafka >= 2.0.0
sasl.client.callback.handler.class 实现AuthenticateCallbackHandler接口的SASL客户端回调处理程序类的全称。 class null 中间
sasl.login.callback.handler.class 实现AuthenticateCallbackHandler接口的SASL登录回调处理程序类的全称。

对于broker来说,登录回调处理程序配置必须以监听器前缀和小写的SASL机制名称为前缀。 例如,listener.name.sasl_ssl.scram-sha-256.sasl.login.callback.handler.class=com.example.CustomScramLoginCallbackHandler

class null
sasl.login.class 实现Login接口的类的全称。

对于broker来说,login config必须以监听器前缀和SASL机制名称为前缀,并使用小写。 例如,listener.name.sasl_ssl.scram-sha-256.sasl.login.class=com.example.CustomScramLogin。

class null
kafka >= 2.1.0
client.dns.lookup 控制客户端如何使用DNS查询。如果设置为 use_all_dns_ips,则依次连接到每个返回的IP地址,直到成功建立连接。断开连接后,使用下一个IP。

一旦所有的IP都被使用过一次,客户端就会再次从主机名中解析IP(s)(然而,JVM和操作系统都会缓存DNS名称查询)。如果设置为 resolve_canonical_bootstrap_servers_only,则将每个引导地址解析成一个canonical名称列表。在bootstrap阶段之后,这和use_all_dns_ips的行为是一样的。如果设置为 default(已弃用),则尝试连接到查找返回的第一个IP地址,即使查找返回多个IP地址。

string use_all_dns_ips [default, use_all_dns_ips, resolve_canonical_bootstrap_servers_only]
delivery.timeout.ms 调用send()返回后报告成功或失败的时间上限。

这限制了消息在发送前被延迟的总时间,等待broker确认的时间(如果期望的话),以及允许重试发送失败的时间。如果遇到不可恢复的错误,重试次数已经用尽,或者消息被添加到一个达到较早发送到期期限的批次中,生产者可能会报告未能在这个配置之前发送记录。 这个配置的值应该大于或等于request.timeout.ms和linger.ms之和。

int 120000 (2 minutes) [0,...]
kafka >= 2.7
ssl.truststore.certificates 可信证书的格式由'ssl.truststore.type'指定。

默认的SSL引擎工厂只支持带X.509证书的PEM格式。

password null
socket.connection.setup.timeout.max.ms 客户端等待建立socket连接的最大时间。连接设置超时时间将随着每一次连续的连接失败而成倍增加,直到这个最大值。

为了避免连接风暴,超时时间将被应用一个0.2的随机因子,导致计算值在20%以下和20%以上的随机范围。

long 127000 (127 seconds)
socket.connection.setup.timeout.ms 客户端等待建立socket连接的时间。

如果在超时之前没有建立连接,客户端将关闭socket通道。

long 10000 (10 seconds)

Broker

基本配置如下:

  • broker.id
  • log.dirs
  • zookeeper.connect

详细配置:

名称 描述 类型 默认 有效值 重要程度
kafka >= 0.10版

Broker

基本配置如下:

  • broker.id
  • log.dirs
  • zookeeper.connect

详细配置:

名称 描述 类型 默认 有效值 重要程度
kafka >= 0.10版

Broker

基本配置如下:

  • broker.id
  • log.dirs
  • zookeeper.connect

详细配置:

名称 描述 类型 默认 有效值 重要程度
kafka >= 0.10版

Broker

基本配置如下:

  • broker.id
  • log.dirs
  • zookeeper.connect

详细配置:

名称 描述 类型 默认 有效值 重要程度
kafka >= 0.10版