Kafka:安装、配置
关于
Kafka 解压即用,并没有繁琐的安装步骤,唯一注意的是其需要 Zookeeper 支持(但其自带有 Zookeeper)。
- 官网下载压缩包:https://www.apache.org/dyn/closer.cgi?path=/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz
- 提取 tar 文件:
$ cd opt/ $ tar -zxf kafka_2.11.0.9.0.0 tar.gz
- 启动服务器:
$ cd kafka_2.11.0.9.0.0 $ bin/kafka-server-start.sh config/server.properties
- 服务器启动后,会得到以下响应:
$ bin/kafka-server-start.sh config/server.properties [2016-01-02 15:37:30,410] INFO KafkaConfig values: request.timeout.ms = 30000 log.roll.hours = 168 inter.broker.protocol.version = 0.9.0.X log.preallocate = false security.inter.broker.protocol = PLAINTEXT ……………………………………………. …………………………………………….
- 停止服务器:
$ bin/kafka-server-stop.sh config/server.properties
NOTE:
- 启动 kafka 需要执行 kafka-server-start.bat 文件,需要传入一个路径参数(server.config 文件的路径):
- 如果使用自行安装的 Zookeeper(先启动 Zookeeper,再启动 Kafka),使用“config/server.properties”。
- 如果使用 Kafka 自带的 Zookeeper(直接启动 Kafka 即可),使用:“config/zookeeper.properties”。
- properties 文件都位于“../kafka_x.xx.x.x.x.x/config”中。
配置
Broker
基本配置如下:
- broker.id
- log.dirs
- zookeeper.connect
详细配置:
名称 | 描述 | 类型 | 默认 | 有效值 | 重要程度 | 更新模式 |
---|---|---|---|---|---|---|
kafka >= 0.10版 | ||||||
zookeeper.connect | zookeeper host string | string | 高 | |||
advertised.host.name | 过时的:当advertised.listeners或listeners没设置时候才使用。
|
string | null | 高 | ||
advertised.listeners | 发布到Zookeeper供客户端使用监听(如果不同)。在IaaS环境中,broker可能需要绑定不同的接口。如果没设置,则使用listeners。 | string | null | 高 | ||
advertised.port | 过时的:当advertised.listeners或listeners没有设置才使用。
|
int | null | 高 | ||
auto.create.topics.enable | 启用自动创建topic | boolean | true | 高 | ||
auto.leader.rebalance.enable | 启用自动平衡leader。如果需要,后台线程会定期检查并触发leader平衡。 | boolean | true | 高 | ||
background.threads | 用于各种后台处理任务的线程数 | int | 10 | [1,...] | 高 | |
broker.id | 服务器的broker id。
要避免zookeeper生成的broker id和用户配置的broker id冲突,从reserved.broker.max.id + 1开始生成。 |
int | -1 | 高 | ||
compression.type | 为给定topic指定最终的压缩类型。支持标准的压缩编码器('gzip', 'snappy', 'lz4')。也接受'未压缩',就是没有压缩。保留由producer设置的原始的压缩编码。 | string | producer | 高 | ||
delete.topic.enable | 启用删除topic。如果此配置已关闭,通过管理工具删除topic将没有任何效果 | boolean | false | 高 | ||
host.name | 过时的:当listeners没有设置才会使用。
|
string | "" | 高 | ||
leader.imbalance.check.interval.seconds | 由控制器触发分区再平衡检查的频率 | long | 300 | 高 | ||
leader.imbalance.per.broker.percentage | 允许每个broker的leader比例不平衡。如果每个broker的值高于此值,控制器将触发leader平衡,该值以百分比的形式指定。 | int | 10 | 高 | ||
listeners | 监听列表 - 监听逗号分隔的URL列表和协议。
|
string | null | 高 | ||
log.dir | 保存日志数据的目录 (补充log.dirs属性) | string | /tmp/kafka-logs | 高 | ||
log.dirs | 保存日志数据的目录。如果未设置,则使用log.dir中的值 | string | null | 高 | ||
log.flush.interval.messages | 消息刷新到磁盘之前,累计在日志分区的消息数 | long | 9223372036854775807 | [1,...] | 高 | |
log.flush.interval.ms | topic中的消息在刷新到磁盘之前保存在内存中的最大时间(以毫秒为单位),如果未设置,则使用log.flush.scheduler.interval.ms中的值 | null | 高 | |||
log.flush.offset.checkpoint.interval.ms | 我们更新的持续记录的最后一次刷新的频率。作为日志的恢复点。 | int | 60000 | [0,...] | 高 | |
log.flush.scheduler.interval.ms | 日志刷新的频率(以毫秒为单位)检查是否有任何日志需要刷新到磁盘 | long | 9223372036854775807 | 高 | ||
log.retention.bytes | 删除日志之前的最大大小 | long | -1 | 高 | ||
log.retention.hours | 删除日志文件保留的小时数(以小时为单位)。第三级是log.retention.ms属性 | int | 168 | 高 | ||
log.retention.minutes | 删除日志文件之前保留的分钟数(以分钟为单位)。次于log.retention.ms属性。如果没设置,则使用log.retention.hours的值。 | int | null | 高 | ||
log.retention.ms | 删除日志文件之前保留的毫秒数(以毫秒为单位),如果未设置,则使用log.retention.minutes的值。 | long | null | 高 | ||
log.roll.hours | 新建一个日志段的最大时间(以小时为单位),次于log.roll.ms属性 | int | 168 | [1,...] | 高 | |
log.roll.jitter.hours | 从logRollTimeMillis(以小时为单位)减去最大抖动,次于log.roll.jitter.ms属性。 | int | 0 | [0,...] | 高 | |
log.roll.ms | 新建一个日志段之前的最大事时间(以毫秒为单位)。如果未设置,则使用log.roll.hours的值。 | long | null | 高 | ||
log.segment.bytes | 单个日志文件的最大大小 | int | 1073741824 | [14,...] | 高 | |
log.segment.delete.delay.ms | 从文件系统中删除文件之前的等待的时间 | long | 60000 | [0,...] | 高 | |
message.max.bytes | 服务器可以接收的消息的最大大小 | int | 1000012 | [0,...] | 高 | |
min.insync.replicas | 当producer设置acks为"all"(或"-1")时。min.insync.replicas指定必须应答成功写入的replicas最小数。
|
int | 1 | [1,...] | 高 | |
num.io.threads | 服务器用于执行网络请求的io线程数 | int | 8 | [1,...] | 高 | |
num.network.threads | 服务器用于处理网络请求的线程数。 | int | 3 | [1,...] | 高 | |
num.recovery.threads.per.data.dir | 每个数据的目录线程数,用于启动时日志恢复和关闭时flush。 | int | 1 | [1,...] | 高 | |
num.replica.fetchers | 从源broker复制消息的提取线程数。递增该值可提高 follower broker的I/O的并发。 | int | 1 | 高 | ||
offset.metadata.max.bytes | offset提交关联元数据条目的最大大小 | int | 4096 | 高 | ||
offsets.commit.required.acks | commit之前需要的应答数,通常,不应覆盖默认的(-1) | short | -1 | 高 | ||
offsets.commit.timeout.ms | Offset提交延迟,直到所有副本都收到提交或超时。 这类似于生产者请求超时。 | int | 5000 | [1,...] | 高 | |
offsets.load.buffer.size | 当加载offset到缓存时,从offset段读取的批量大小。 | int | 5242880 | [1,...] | 高 | |
offsets.retention.check.interval.ms | 检查过期的offset的频率。 | long | 600000 | [1,...] | 高 | |
offsets.retention.minutes | offset topic的日志保留时间(分钟) | int | 1440 | [1,...] | 高 | |
offsets.topic.compression.codec | 压缩编码器的offset topic - 压缩可以用于实现“原子”提交 | int | 0 | 高 | ||
offsets.topic.num.partitions | offset commit topic的分区数(部署之后不应更改) | int | 50 | [1,...] | 高 | |
offsets.topic.replication.factor | offset topic复制因子(ps:就是备份数,设置的越高来确保可用性)。
为了确保offset topic有效的复制因子,第一次请求offset topic时,活的broker的数量必须最少最少是配置的复制因子数。 如果不是,offset topic将创建失败或获取最小的复制因子(活着的broker,复制因子的配置) |
short | 3 | [1,...] | 高 | |
offsets.topic.segment.bytes | offset topic段字节应该相对较小一点,以便于加快日志压缩和缓存加载 | int | 104857600 | [1,...] | 高 | |
port | 过时的:当listener没有设置才使用。请改用listeners。该port监听和接收连接。 | int | 9092 | 高 | ||
queued.max.requests | 在阻塞网络线程之前允许的排队请求数 | int | 500 | [1,...] | 高 | |
quota.consumer.default | 过时的:当默认动态的quotas没有配置或在Zookeeper时。如果每秒获取的字节比此值高,所有消费者将通过clientId/consumer区分限流。 | long | 9223372036854775807 | [1,...] | 高 | |
quota.producer.default | 过时的:当默认动态的quotas没有配置,或在zookeeper时。如果生产者每秒比此值高,所有生产者将通过clientId区分限流。 | long | 9223372036854775807 | [1,...] | 高 | |
replica.fetch.min.bytes Minimum | 每个获取响应的字节数。如果没有满足字节数,等待replicaMaxWaitTimeMs。 | int | 1 | 高 | ||
replica.fetch.wait.max.ms | 跟随者副本发出每个获取请求的最大等待时间,此值应始终小于replica.lag.time.max.ms,以防止低吞吐的topic的ISR频繁的收缩。 | int | 500 | 高 | ||
replica.high.watermark.checkpoint.interval.ms | 达到高“水位”保存到磁盘的频率。 | long | 5000 | 高 | ||
replica.lag.time.max.ms | 如果一个追随者没有发送任何获取请求或至少在这个时间的这个leader的没有消费完。该leader将从isr中移除这个追随者。 | long | 10000 | 高 | ||
replica.socket.receive.buffer.bytes | 用于网络请求的socket接收缓存区 | int | 65536 | 高 | ||
replica.socket.timeout.ms | 网络请求的socket超时,该值最少是replica.fetch.wait.max.ms | int | 30000 | 高 | ||
request.timeout.ms | 该配置控制客户端等待请求的响应的最大时间。
如果超过时间还没收到消费。客户端将重新发送请求,如果重试次数耗尽,则请求失败。 |
int | 30000 | 高 | ||
socket.receive.buffer.bytes | socket服务的SO_RCVBUF缓冲区。如果是-1,则默认使用OS的。 | int | 102400 | 高 | ||
socket.request.max.bytes | socket请求的最大字节数 | int | 104857600 | [1,...] | 高 | |
socket.send.buffer.bytes | socket服务的SO_SNDBUF缓冲区。如果是-1,则默认使用OS的。 | int | 102400 | 高 | ||
unclean.leader.election.enable | 是否启用不在ISR中的副本参与选举leader的最后的手段。这样做有可能丢失数据。 | boolean | true | 高 | ||
zookeeper.connection.timeout.ms | 连接zookeeper的最大等待时间,如果未设置,则使用zookeeper.session.timeout.ms。 | int | null | 高 | ||
zookeeper.session.timeout.ms | Zookeeper会话的超时时间 | int | 6000 | 高 | ||
zookeeper.set.acl | 设置客户端使用安全的ACL | boolean | false | 高 | ||
broker.id.generation.enable | 启用自动生成broker id。启用该配置时应检查reserved.broker.max.id。 | boolean | true | 中 | ||
broker.rack | broker机架,用于机架感知副本分配的失败容错。例如:RACK1, us-east-1d | string | null | 中 | ||
connections.max.idle.ms Idle | 连接超时:闲置时间超过该设置,则服务器socket处理线程关闭这个连接。 | long | 600000 | 中 | ||
controlled.shutdown.enable | 启用服务器的关闭控制。 | boolean | true | 中 | ||
controlled.shutdown.max.retries | 控制因多种原因导致的shutdown失败,当这样失败发生,尝试重试的次数 | int | 3 | 中 | ||
controlled.shutdown.retry.backoff.ms | 在每次重试之前,系统需要时间从导致先前故障的状态(控制器故障转移,复制延迟等)恢复。 此配置是重试之前等待的时间数。 | long | 5000 | 中 | ||
controller.socket.timeout.ms | 控制器到broker通道的sockt超时时间 | int | 30000 | 中 | ||
default.replication.factor | 自动创建topic的默认的副本数 | int | 1 | 中 | ||
fetch.purgatory.purge.interval.requests | 拉取请求清洗间隔(请求数) | int | 1000 | 中 | ||
group.max.session.timeout.ms | 已注册的消费者允许的最大会话超时时间,设置的时候越长使消费者有更多时间去处理心跳之间的消息。但察觉故障的时间也拉长了。 | int | 300000 | 中 | ||
group.min.session.timeout.ms | 已经注册的消费者允许最小的会话超时时间,更短的时间去快速的察觉到故障,代价是频繁的心跳,这可能会占用大量的broker资源。 | int | 6000 | 中 | ||
inter.broker.protocol.version | 指定broker内部通讯使用的版本。通常在更新broker时使用。有效的值为:0.8.0, 0.8.1, 0.8.1.1, 0.8.2, 0.8.2.0, 0.8.2.1, 0.9.0.0, 0.9.0.1。查看ApiVersion找到的全部列表。 | string | 0.10.1-IV2 | 中 | ||
log.cleaner.backoff.ms | 当没有日志要清理时,休眠的时间 | long | 15000 | [0,...] | 中 | |
log.cleaner.dedupe.buffer.size | 用于日志去重的内存总量(所有cleaner线程) | long | 134217728 | 中 | ||
log.cleaner.delete.retention.ms | 删除记录保留多长时间? | long | 86400000 | 中 | ||
log.cleaner.enable | 在服务器上启用日志清洗处理?如果使用的任何topic的cleanup.policy=compact包含内部的offset topic,应启动。如果禁用,那些topic将不会被压缩并且会不断的增大。 | boolean | true | 中 | ||
log.cleaner.io.buffer.load.factor | 日志cleaner去重缓冲负载因子。去重缓冲区的百分比,较高的值将允许同时清除更多的日志,但将会导致更多的hash冲突。 | double | 0.9 | 中 | ||
log.cleaner.io.buffer.size | 所有日志清洁器线程I/O缓存的总内存 | int | 524288 | [0,...] | 中 | |
log.cleaner.io.max.bytes.per.second | 日志清理器限制,以便其读写i/o平均小与此值。 | double | 1.7976931348623157E308 | 中 | ||
log.cleaner.min.cleanable.ratio | 脏日志与日志的总量的最小比率,以符合清理条件 | double | 0.5 | 中 | ||
log.cleaner.min.compaction.lag.ms | 一条消息在日志保留不压缩的最小时间,仅适用于正在压缩的日志。 | long | 0 | 中 | ||
log.cleaner.threads | 用于日志清除的后台线程数 | int | 1 | [0,...] | 中 | |
log.cleanup.policy | 超过保留时间段的默认清除策略。逗号分隔的有效的策略列表。有效的策略有:“delete”和“compact” | list | [delete] | [compact, delete] | 中 | |
log.index.interval.bytes | 添加一个条目到offset的间隔 | int | 4096(4 kibibytes) | [0,...] | 中 | |
log.index.size.max.bytes | offset index的最大大小(字节) | int | 10485760 | [4,...] | 中 | |
log.message.format.version | 指定追加到日志中的消息格式版本。例如: 0.8.2, 0.9.0.0, 0.10.0。
通过设置一个特定消息格式版本,用户需要保证磁盘上所有现有的消息小于或等于指定的版本。错误的设置将导致旧版本的消费者中断,因为消费者接收一个不理解的消息格式。 |
string | 0.10.1-IV2 | 中 | ||
log.message.timestamp.difference.max.ms | 如果log.message.timestamp.type=CreateTime,broker接收消息时的时间戳和消息中指定的时间戳之间允许的最大差异。
|
long | 9223372036854775807 | [0,...] | 中 | |
log.message.timestamp.type | 定义消息中的时间戳是消息创建时间或日志追加时间。该值可设置为CreateTime 或 LogAppendTime | string | CreateTime | [CreateTime, LogAppendTime] | 中 | |
log.preallocate | 在创建新段时预分配文件?如果你在Windowns上使用kafka,你可能需要设置它为true。 | boolean | false | 中 | ||
log.retention.check.interval.ms | 日志清除程序检查日志是否满足被删除的频率(以毫秒为单位) | long | 300000 | [1,...] | 中 | |
max.connections.per.ip | 允许每个ip地址的最大连接数。 | int | 2147483647 | [1,...] | 中 | |
max.connections.per.ip.overrides | per-ip或hostname覆盖默认最大连接数 | string | "" | 中 | ||
num.partitions | topic的默认分区数 | int | 1 | [1,...] | 中 | |
principal.builder.class | 实现PrincipalBuilder接口类的完全限定名,该接口目前用于构建与SSL SecurityProtocol连接的Principal。 | class | class org。apache。kafka。common。security。auth。DefaultPrincipalBuilder(wiki傻逼编辑器:。换位.) | 中 | ||
producer.purgatory.purge.interval.requests | 生产者请求purgatory的清洗间隔(请求数) | int | 1000 | 中 | ||
replica.fetch.backoff.ms | 当拉取分区发生错误时休眠的时间 | int | 1000 | [0,...] | 中 | |
replica.fetch.max.bytes | 拉取每个分区的消息的字节数。这不是绝对的最大值,如果提取的第一个非空分区中的第一个消息大于这个值,则消息仍然返回,以确保进展。通过message.max.bytes (broker配置)或max.message.bytes (topic配置)定义broker接收的最大消息大小。 | int | 1048576 | [0,...] | 中 | |
replica.fetch.response.max.bytes | 预计整个获取响应的最大字节数,这不是绝对的最大值,如果提取的第一个非空分区中的第一个消息大于这个值,则消息仍然返回,以确保进展。
通过message.max.bytes (broker配置)或max.message.bytes (topic配置)定义broker接收的最大消息大小。 |
int | 10485760 | [0,...] | 中 | |
reserved.broker.max.id | broker.id的最大数 | int | 1000 | [0,...] | 中 | |
sasl.enabled.mechanisms | 可用的SASL机制列表,包含任何可用的安全提供程序的机制。默认情况下只有GSSAPI是启用的。 | list | [GSSAPI] | 中 | ||
sasl.kerberos.kinit.cmd | Kerberos kinit 命令路径。 | string | /usr/bin/kinit | 中 | ||
sasl.kerberos.min.time.before.relogin | 登录线程在刷新尝试的休眠时间。 | long | 60000 | 中 | ||
sasl.kerberos.principal.to.local.rules | principal名称映射到一个短名称(通常是操作系统用户名)。按顺序,使用与principal名称匹配的第一个规则将其映射其到短名称。忽略后面的规则。
默认情况下,{username}/{hostname}@{REALM} 映射到 {username}。 |
list | [DEFAULT] | 中 | ||
sasl.kerberos.service.name | Kafka运行的Kerberos principal名称。可以在JAAS或Kafka的配置文件中定义。 | string | null | 中 | ||
sasl.kerberos.ticket.renew.jitter | 添加到更新时间的随机抖动的百分比 | time. double | 0.05 | 中 | ||
sasl.kerberos.ticket.renew.window.factor | 登录线程休眠,直到从上次刷新到ticket的到期的时间已到达(指定窗口因子),在此期间它将尝试更新ticket。 | double | 0.8 | 中 | ||
sasl.mechanism.inter.broker.protocol | SASL机制,用于broker之间的通讯,默认是GSSAPI。 | string | GSSAPI | 中 | ||
security.inter.broker.protocolSecurity | broker之间的通讯协议,有效值有:PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL。 | string | PLAINTEXT | 中 | ||
ssl.cipher.suites | 密码套件列表。认证,加密,MAC和秘钥交换算法的组合,用于使用TLS或SSL的网络协议交涉网络连接的安全设置,默认情况下,支持所有可用的密码套件。 | list | null | 中 | ||
ssl.client.auth | 配置请求客户端的broker认证。
常见的设置:
|
string | none | [required, requested, none] | 中 | |
ssl.enabled.protocols | 已启用的SSL连接协议列表。 | list | [TLSv1.2, TLSv1.1, TLSv1] | 中 | ||
ssl.key.password | 秘钥库文件中的私钥密码。对客户端是可选的。 | password | null | 中 | ||
ssl.keymanager.algorithm | 用于SSL连接的密钥管理工厂算法。默认值是Java虚拟机的密钥管理工厂算法。 | string | SunX509 | 中 | ||
ssl.keystore.location | 密钥仓库文件的位置。客户端可选,并可用于客户端的双向认证。 | string | null | 中 | ||
ssl.keystore.password | 密钥仓库文件的仓库密码。客户端可选,只有ssl.keystore.location配置了才需要。 | password | null | 中 | ||
ssl.keystore.type | 密钥仓库文件的格式。客户端可选。 | string | JKS | 中 | ||
ssl.protocol | 用于生成SSLContext,默认是TLS,适用于大多数情况。允许使用最新的JVM,LS, TLSv1.1 和TLSv1.2。 SSL,SSLv2和SSLv3 老的JVM也可能支持,由于有已知的安全漏洞,不建议使用。 | string | TLS | 中 | ||
ssl.provider | 用于SSL连接的安全提供程序的名称。默认值是JVM的安全程序。 | string | null | 中 | ||
ssl.trustmanager.algorithm | 信任管理工厂用于SSL连接的算法。默认为Java虚拟机配置的信任算法。 | string | PKIX | 中 | ||
ssl.truststore.location | 信任仓库文件的位置 | string | null | 中 | ||
ssl.truststore.password | 信任仓库文件的密码 | password | null | 中 | ||
ssl.truststore.type | 信任仓库文件的文件格式 | string | JKS | 中 | ||
authorizer.class.name | 用于认证的授权程序类 | string | "" | 低 | ||
metric.reporters | 度量报告的类列表,通过实现MetricReporter接口,允许插入新度量标准类。JmxReporter包含注册JVM统计。 | list | [] | 低 | ||
metrics.num.samples | 维持计算度量的样本数。 | int | 2 | [1,...] | 低 | |
metrics.sample.window.ms | 计算度量样本的时间窗口 | long | 30000 | [1,...] | 低 | |
quota.window.num | 在内存中保留客户端限额的样本数 | int | 11 | [1,...] | 低 | |
quota.window.size.seconds | 每个客户端限额的样本时间跨度 | int | 1 | [1,...] | 低 | |
replication.quota.window.num | 在内存中保留副本限额的样本数 | int | 11 | [1,...] | 低 | |
replication.quota.window.size.seconds | 每个副本限额样本数的时间跨度 | int | 1 | [1,...] | 低 | |
ssl.endpoint.identification.algorithm | 端点身份标识算法,使用服务器证书验证服务器主机名。 | string | null | 低 | ||
ssl.secure.random.implementation | 用于SSL加密操作的SecureRandom PRNG实现。 | string | null | 低 | ||
zookeeper.sync.time.ms | ZK follower可落后与leader多久。 | int | 2000 | 低 | ||
以下是kafka新版本的增量配置 | ||||||
kafka >= 1.0 | ||||||
group.initial.rebalance.delay.ms | 分组协调器在执行第一次重新平衡之前,等待更多消费者加入新组的时间。延迟时间越长,意味着重新平衡的次数可能越少,但会增加处理开始前的时间。 | int | 3000 | 中 | 只读 | |
transaction.abort.timed.out.transaction.cleanup.interval.ms | 回滚已超时的事务的时间间隔。 | int | 10000 (10 seconds) | [1,...] | 低 | 只读 |
transaction.remove.expired.transaction.cleanup.interval.ms | 删除因transactional.id.expiration.ms过期的事务的时间间隔。 | int | 3600000 (1 hour) | [1,...] | 低 | 只读 |
transaction.max.timeout.ms | 事务的最大允许超时时间。如果客户端请求的交易时间超过了这个时间,那么broker将在InitProducerIdRequest中返回一个错误。
这可以防止客户端的超时时间过大,从而阻滞消费者从事务中包含的主题中读取。 |
int | 900000 (15 minutes) | [1,...] | 高 | 只读 |
transaction.state.log.load.buffer.size | 在将生产者id和事务加载到缓存中时,从事务日志段读取的批次大小(软限制,如果消息太大,则重写) | int | 5242880 | [1,...] | 高 | 只读 |
transaction.state.log.min.isr | 覆盖事务topic的min.insync.replicas配置。 | int | 2 | [1,...] | 高 | 只读 |
transaction.state.log.num.partitions | 事务topic的分区数(部署后不应改变)。 | int | 50 | [1,...] | 高 | 只读 |
transaction.state.log.replication.factor | 事务topic的复制因子(设置较高来确保可用性)。内部topic创建将失败,直到集群规模满足该复制因子要求。 | short | 3 | [1,...] | 高 | 只读 |
transaction.state.log.segment.bytes | 事务topic段的字节数应保持相对较小,以利于加快日志压缩和缓存加载速度 | int | 104857600 (100 mebibytes) | [1,...] | 高 | 只读 |
transactional.id.expiration.ms | 事务协调器在没有收到当前事务的任何事务状态更新的情况下,在其事务id过期前等待的时间,单位为ms。
这个设置也会影响生产者id过期:一旦这个时间在给定的生产者id最后一次写入后过去,生产者id就会过期。
|
int | 604800000 (7 days) | [1,...] | 高 | 只读 |
kafka >= 2.0 | ||||||
sasl.client.callback.handler.class | 实现AuthenticateCallbackHandler接口的SASL客户端回调处理程序类的全称。 | class | null | 中间 | 只读 | |
sasl.login.callback.handler.class | 实现AuthenticateCallbackHandler接口的SASL登录回调处理程序类的全称。对于broker来说,登录回调处理程序配置必须以监听器前缀和小写的SASL机制名称为前缀。
例如,listener.name.sasl_ssl.scram-sha-256.sasl.login.callback.handler.class=com.example.CustomScramLoginCallbackHandler |
class | null | 中间 | 只读 | |
sasl.login.class | 实现Login接口的类的全称。对于broker来说,login config必须以监听器前缀和SASL机制名称为前缀,并使用小写。
例如,listener.name.sasl_ssl.scram-sha-256.sasl.login.class=com.example.CustomScramLogin。 |
class | null | 中间 | 只读 | |
kafka >= 2.5 | ||||||
zookeeper.clientCnxnSocket | 当使用TLS连接到ZooKeeper时,通常设置为org.apache.zookeeper.ClientCnxnSocketNetty。
覆盖任何同名的zookeeper.clientCnxnSocket设置的显式值。 |
string | null | 中间 | 只读 | |
zookeeper.ssl.client.enable | 设置客户端连接到ZooKeeper时使用TLS。
显式的值会覆盖任何通过zookeeper.client.secure设置的值(注意名称不同)。 如果两者都没有设置,默认为false; 当为true时,必须设置zookeeper.clientCnxnSocket(通常为org.apache.zookeeper.ClientCnxnSocketNetty); 其他需要设置的值可能包括zookeeper.ssl.cipher.suites、zookeeper.ssl.crl.enable、zookeeper.ssl.enabled.protocols、zookeeper.ssl.endpoint. identification.algorithm,zookeeper.ssl.keystore.location,zookeeper.ssl.keystore.password,zookeeper.ssl.keystore.type,zookeeper.ssl. ocsp.enable, zookeeper.ssl.protocol, zookeeper.ssl.truststore.location, zookeeper.ssl.truststore.password, zookeeper.ssl.truststore.type。 |
boolean | false | 中间 | 只读 | |
zookeeper.ssl.keystore.location | 当使用客户端证书与TLS连接到ZooKeeper时的keystore位置。
覆盖任何通过zookeeper.ssl.keyStore.location系统属性设置的显式值(注意是驼峰大小)。 |
password | null | 中间 | 只读 | |
zookeeper.ssl.keystore.password | 当使用客户端证书与TLS连接到ZooKeeper时的keystore密码。覆盖任何通过`zookeeper.ssl.keyStore.password系统属性设置的显式值(注意驼峰大写)。 注意,ZooKeeper不支持与keystore密码不同的密钥密码,所以一定要将keystore中的密钥密码设置为与keystore密码相同,否则连接Zookeeper的尝试将失败。 | password | null | 中间 | 只读 | |
zookeeper.ssl.keystore.type | 当使用客户端证书与TLS连接到ZooKeeper时的keystore类型。覆盖任何通过zookeeper.ssl.keyStore.type系统属性设置的显式值(注意骆驼大写)。
默认值为null意味着该类型将根据keystore的文件扩展名自动检测。 |
string | null | 中间 | 只读 | |
zookeeper.ssl.protocol | 指定ZooKeeper TLS协商中使用的协议。
一个显式的值会覆盖任何通过同名的zookeeper.ssl.protocol系统设置的值。 |
string | TLSv1.2 | 低 | 只读 | |
zookeeper.ssl.cipher.suites | 指定在ZooKeeper TLS协商中使用的密码套件(csv),覆盖任何通过zookeeper.ssl.ciphersuites系统属性设置的显式值(注意单字 "ciphersuites")。
覆盖任何通过zookeeper.ssl.ciphersuites系统属性设置的显式值(注意 "ciphersuites "这个单字)。 默认值为 "null "意味着启用的密码套件列表是由正在使用的Java运行时决定的。 |
boolean | false | 低 | 只读 | |
zookeeper.ssl.crl.enable | 指定是否启用ZooKeeper TLS协议中的证书撤销列表。
覆盖任何通过zookeeper.ssl.crl系统属性设置的显式值(注意是短名)。 |
boolean | false | 低 | 只读 | |
zookeeper.ssl.enabled.protocols | 指定ZooKeeper TLS协商(csv)中启用的协议。
覆盖任何通过zookeeper.ssl.enabledProtocols系统属性设置的显式值(注意骆驼大写)。 默认值为 "null "意味着启用的协议将是zookeeper.ssl.protocol配置属性的值。 |
list | null | 低 | 只读 | |
zookeeper.ssl.endpoint.identification.algorithm | 指定是否在ZooKeeper TLS协商过程中启用主机名验证,(不区分大小写)"https "表示启用ZooKeeper主机名验证,显式的空白值表示禁用(仅为测试目的建议禁用)。
明确的值会覆盖任何通过zookeeper.ssl.hostnameVerification系统属性设置的 "true "或 "false "值(注意不同的名称和值;true意味着https,false意味着空白)。 |
string | HTTPS | 低 | 只读 | |
zookeeper.ssl.ocsp.enable | 指定是否启用ZooKeeper TLS协议中的在线证书状态协议。
覆盖任何通过zookeeper.ssl.ocsp系统属性设置的显式值(注意是短名)。 |
boolean | false | 低 | 只读 | |
kafka >= 2.7 | ||||||
ssl.truststore.certificates | 可信证书的格式由'ssl.truststore.type'指定。
默认的SSL引擎工厂只支持带X.509证书的PEM格式。 |
password | null | 中间 | 每个broker | |
socket.connection.setup.timeout.max.ms | 客户端等待建立socket连接的最大时间。
连接设置超时时间将随着每一次连续的连接失败而成倍增加,直到这个最大值。 为了避免连接风暴,超时时间将被应用一个0.2的随机因子,导致计算值在20%以下和20%以上的随机范围。 |
long | 127000 (127 seconds) | 中间 | 只读 | |
socket.connection.setup.timeout.ms | 客户端等待建立socket连接的时间。
如果在超时之前没有建立连接,客户端将关闭socket通道。 |
long | 10000 (10 seconds) | 中间 | 只读 |
Topic
与topic相关的配置,服务器的默认值,也可选择的覆盖指定的topic。
- 如果没有给出指定topic的配置,则将使用服务器默认值。
- 可以通过“-config”选项在topic创建时设置。
此示例使用自定义最大消息的大小和刷新率,创建一个名为 my-topic 的topic:
> bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic my-topic --partitions 1
--replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1
也可以使用“alter configs”命令修改或设置。 此示例修改更新 my-topic 的最大的消息大小:
> bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic
--alter --add-config max.message.bytes=128000
可以执行以下命令验证结果:
> bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic --describe
移除设置:
> bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic --alter --delete-config max.message.bytes
详细配置:
名称 | 描述 | 类型 | 默认 | 有效值 | 服务器默认属性 | 更新模式 |
---|---|---|---|---|---|---|
cleanup.policy | “delete”或“compact”。指定在旧的日志段的保留策略。
默认策略(“delete”),将达到保留时间或大小限制的日志废弃。 “compact”则压缩日志。 |
list | delete | [compact, delete] | log.cleanup.policy | 中 |
compression.type | 针对指定的topic设置最终的压缩方式。
标准的压缩格式有'gzip', 'snappy', lz4。还可以设置'uncompressed',就是不压缩;设置为'producer'这意味着保留生产者设置的原始压缩编解码。 |
string | producer | [uncompressed, snappy, lz4, gzip, producer] | compression.type | 中 |
delete.retention.ms | 保留删除消息压缩topic的删除标记的时间。此设置还给出消费者如果从offset 0开始读取并确保获得最终阶段的有效快照的时间范围(否则,在完成扫描之前可能已经回收了)。 | long | 86400000 | [0,...] | log.cleaner.delete.retention.ms | 中 |
file.delete.delay.ms | 从文件系统中删除文件之前等待的时间 | long | 60000 | [0,...] | log.segment.delete.delay.ms | 中 |
flush.messages | 此设置允许指定我们强制fsync写入日志的数据的间隔。
例如,如果这被设置为1,我们将在每个消息之后fsync; 如果是5,我们将在每五个消息之后fsync。
|
long | 9223372036854775807 | [0,...] | log.flush.interval.messages | 中 |
flush.ms | 此设置允许我们强制fsync写入日志的数据的时间间隔。
例如,如果这设置为1000,那么在1000ms过去之后,我们将fsync。
|
long | 9223372036854775807 | [0,...] | log.flush.interval.ms | 中 |
follower.replication.throttled.replicas | follower复制限流列表。
该列表应以[PartitionId]的形式描述一组副本:[BrokerId],[PartitionId]:[BrokerId]:...或者通配符'*'可用于限制此topic的所有副本。 |
list | "" | [partitionId],[brokerId]:[partitionId],[brokerId]:... | follower.replication.throttled.replicas | 中 |
index.interval.bytes | 此设置控制Kafka向其offset索引添加索引条目的频率。默认设置确保我们大致每4096个字节索引消息。
更多的索引允许读取更接近日志中的确切位置,但使索引更大。你不需要改变这个值。 |
int | 4096 | [0,...] | log.index.interval.bytes | 中 |
leader.replication.throttled.replicas | 在leader方面进行限制的副本列表。
该列表设置以[PartitionId]的形式描述限制副本:[PartitionId]:[BrokerId],[PartitionId]:[BrokerId]:...或使用通配符‘*’限制该topic的所有副本。 |
list | "" | [partitionId],[brokerId]:[partitionId],[brokerId]:... | leader.replication.throttled.replicas | 中 |
max.message.bytes | kafka允许的最大的消息批次大小。
如果增加此值,并且消费者的版本比0.10.2老,那么消费者的提取的大小也必须增加,以便他们可以获取大的消息批次。 在最新的消息格式版本中,消息总是分组批量来提高效率。 在之前的消息格式版本中,未压缩的记录不会分组批量,并且此限制仅适用于该情况下的单个消息。 |
int | 1000012 | [0,...] | message.max.bytes | 中 |
message.format.version | 指定消息附加到日志的消息格式版本。
该值应该是一个有效的ApiVersion。例如:0.8.2, 0.9.0.0, 0.10.0,更多细节检查ApiVersion。 通过设置特定的消息格式版本,并且磁盘上的所有现有消息都小于或等于指定版本。 不正确地设置此值将导致消费者使用旧版本,因为他们将接收到“不认识”的格式的消息。 |
string | 0.11.0-IV2 | log.message.format.version | 中 | |
min.cleanable.dirty.ratio | 此配置控制日志压缩程序将尝试清除日志的频率(假设启用了日志压缩)。
默认情况下,我们将避免清理超过50%日志被压缩的日志。 该比率限制日志中浪费的最大空间重复(在最多50%的日志中可以是重复的50%)。 更高的比率意味着更少,更有效的清洁,但意味着日志中的浪费更多。 |
double | 0.5 | [0,...,1] | log.cleaner.min.cleanable.ratio | 中 |
min.compaction.lag.ms | 消息在日志中保持不压缩的最短时间。
仅适用于正在压缩的日志。 |
long | 0 | [0,...] | log.cleaner.min.compaction.lag.ms | 中 |
min.insync.replicas | 当生产者设置应答为"all"(或“-1”)时,此配置指定了成功写入的副本应答的最小数。
如果没满足此最小数,则生产者将引发异常(NotEnoughReplicas或NotEnoughReplicasAfterAppend) 当min.insync.replicas和acks强制更大的耐用性时。 典型的情况是创建一个副本为3的topic,将min.insync.replicas设置为2,并设置acks为“all”。 如果多数副本没有收到写入,这将确保生产者引发异常。 |
int | 1 | [1,...] | min.insync.replicas | 中 |
preallocate | 如果我们在创建新的日志段时在磁盘上预分配该文件,那么设为True。 | boolean | false | log.preallocate | 中 | |
retention.bytes | 如果我们使用“删除”保留策略,则此配置将控制日志可以增长的最大大小,之后我们将丢弃旧的日志段以释放空间。
默认情况下,没有设置大小限制则仅限于时间限制。 |
long | -1 | log.retention.bytes | 中 | |
retention.ms | 如果我们使用“删除”保留策略,则此配置控制我们将保留日志的最长时间,然后我们将丢弃旧的日志段以释放空间。
这代表SLA消费者必须读取数据的时间长度。 |
long | 604800000 | log.retention.ms | 中 | |
segment.bytes | 此配置控制日志的段文件大小。一次保留和清理一个文件,因此较大的段大小意味着较少的文件,但对保留率的粒度控制较少。 | int | 1073741824 | [14,...] | log.segment.bytes | 中 |
segment.index.bytes | 此配置控制offset映射到文件位置的索引的大小。我们预先分配此索引文件,并在日志滚动后收缩它。通常不需要更改此设置。 | int | 10485760 | [0,...] | log.index.size.max.bytes | 中 |
segment.jitter.ms | 从计划的段滚动时间减去最大随机抖动,以避免异常的段滚动 | long | 0 | [0,...] | log.roll.jitter.ms | 中 |
segment.ms | 此配置控制Kafka强制日志滚动的时间段,以确保保留可以删除或压缩旧数据,即使段文件未满。 | long | 604800000 | [0,...] | log.roll.ms | 中 |
unclean.leader.election.enable | 是否将不在ISR中的副本作为最后的手段选举为leader,即使这样做可能会导致数据丢失。 | boolean | false | unclean.leader.election.enable | 中 | |
kafka > 2.0 | ||||||
message.downconversion.enable | 此配置控制是否启用消息格式的向下转换以满足消费请求。
当设置为false时,broker不会对期待旧消息格式的消费者执行向下转换。broker 会对来自此类旧客户端的消费请求作出 UNSUPPORTED_VERSION 错误响应。
|
boolean | false | log.message.downconversion.enable | 低 |
Producer
详细配置:
名称 | 描述 | 类型 | 默认 | 有效值 | 重要程度 |
---|---|---|---|---|---|
bootstrap.servers | host/port列表,用于初始化建立和Kafka集群的连接。
列表格式为host1:port1,host2:port2,....,无需添加所有的集群地址,kafka会根据提供的地址发现其他的地址(你可以多提供几个,以防提供的服务器关闭) |
list | 高 | ||
key.serializer | 实现 org.apache.kafka.common.serialization.Serializer 接口的 key 的 Serializer 类。 | class | 高 | ||
value.serializer | 实现 org.apache.kafka.common.serialization.Serializer 接口的value 的 Serializer 类。 | class | 高 | ||
acks | 生产者需要leader确认请求完成之前接收的应答数。
此配置控制了发送消息的耐用性,支持以下配置:
|
string | 1 | [all, -1, 0, 1] | 高 |
buffer.memory | 生产者用来缓存等待发送到服务器的消息的内存总字节数。如果消息发送比可传递到服务器的快,生产者将阻塞max.block.ms之后,抛出异常。
此设置应该大致的对应生产者将要使用的总内存,但不是硬约束,因为生产者所使用的所有内存都用于缓冲。一些额外的内存将用于压缩(如果启动压缩),以及用于保持发送中的请求。 |
long | 33554432 | [0,...] | 高 |
compression.type | 数据压缩的类型。默认为空(就是不压缩)。有效的值有 none,gzip,snappy, 或 lz4。
压缩全部的数据批,因此批的效果也将影响压缩的比率(更多的批次意味着更好的压缩)。 |
string | none | 高 | |
retries | 设置一个比零大的值,客户端如果发送失败则会重新发送。
注意,这个重试功能和客户端在接到错误之后重新发送没什么不同。 如果max.in.flight.requests.per.connection没有设置为1,有可能改变消息发送的顺序,因为如果2个批次发送到一个分区中,并第一个失败了并重试,但是第二个成功了,那么第二个批次将超过第一个。 |
int | 0 | [0,...,2147483647] | 高 |
ssl.key.password | 密钥仓库文件中的私钥的密码。 | password | null | 高 | |
ssl.keystore.location | 密钥仓库文件的位置。可用于客户端的双向认证。 | string | null | 高 | |
ssl.keystore.password | 密钥仓库文件的仓库密码。
只有配置了ssl.keystore.location时才需要。 |
password | null | 高 | |
ssl.truststore.location | 信任仓库的位置 | string | null | 高 | |
ssl.truststore.password | 信任仓库文件的密码 | password | null | 高 | |
batch.size | 当多个消息要发送到相同分区的时,生产者尝试将消息批量打包在一起,以减少请求交互。
这样有助于客户端和服务端的性能提升。该配置的默认批次大小(以字节为单位): 不会打包大于此配置大小的消息。 发送到broker的请求将包含多个批次,每个分区一个,用于发送数据。 较小的批次大小有可能降低吞吐量(批次大小为0则完全禁用批处理)。 一个非常大的批次大小可能更浪费内存。因为我们会预先分配这个资源。 |
int | 16384 | [0,...] | 中 |
client.id | 当发出请求时传递给服务器的id字符串。这样做的目的是允许服务器请求记录记录这个【逻辑应用名】,这样能够追踪请求的源,而不仅仅只是ip/prot。 | string | "" | 中 | |
connections.max.idle.ms | 多少毫秒之后关闭闲置的连接。 | long | 540000 | 中 | |
linger.ms | 生产者组将发送的消息组合成单个批量请求。
正常情况下,只有消息到达的速度比发送速度快的情况下才会出现。但是,在某些情况下,即使在适度的负载下,客户端也可能希望减少请求数量。此设置通过添加少量人为延迟来实现。- 也就是说,不是立即发出一个消息,生产者将等待一个给定的延迟,以便和其他的消息可以组合成一个批次。这类似于Nagle在TCP中的算法。此设置给出批量延迟的上限:一旦我们达到分区的batch.size值的记录,将立即发送,不管这个设置如何,但是,如果比这个小,我们将在指定的“linger”时间内等待更多的消息加入。此设置默认为0(即无延迟)。假设,设置 linger.ms=5,将达到减少发送的请求数量的效果,但对于在没有负载情况,将增加5ms的延迟。 |
long | 0 | [0,...] | 中 |
max.block.ms | 该配置控制 KafkaProducer.send() 和 KafkaProducer.partitionsFor() 将阻塞多长时间。
此外这些方法被阻止,也可能是因为缓冲区已满或元数据不可用。在用户提供的序列化程序或分区器中的锁定不会计入此超时。 |
long | 60000 | [0,...] | 中 |
max.request.size | 请求的最大大小(以字节为单位)。
此设置将限制生产者的单个请求中发送的消息批次数,以避免发送过大的请求。这也是最大消息批量大小的上限。 请注意,服务器拥有自己的批量大小,可能与此不同。 |
int | 1048576 | [0,...] | 中 |
partitioner.class | 实现Partitioner接口的的Partitioner类。 | class | org.apache.kafka.clients.producer.internals.DefaultPartitioner | 中 | |
receive.buffer.bytes | 读取数据时使用的TCP接收缓冲区(SO_RCVBUF)的大小。
如果值为-1,则将使用OS默认值。 |
int | 32768 | [-1,...] | 中 |
request.timeout.ms | 该配置控制客户端等待请求响应的最长时间。
如果在超时之前未收到响应,客户端将在必要时重新发送请求,如果重试耗尽,则该请求将失败。 这应该大于replica.lag.time.max.ms,以减少由于不必要的生产者重试引起的消息重复的可能性。 |
int | 30000 | [0,...] | 中 |
sasl.jaas.config | JAAS配置文件使用的格式的SASL连接的JAAS登录上下文参数。这里描述JAAS配置文件格式。该值的格式为:'(=)*;' | password | null | 中 | |
sasl.kerberos.service.name | Kafka运行的Kerberos主体名称。
可以在Kafka的JAAS配置或Kafka的配置中定义。 |
string | null | 中 | |
sasl.mechanism | SASL机制用于客户端连接。这是安全提供者可用与任何机制。
GSSAPI是默认机制。 |
string | GSSAPI | 中 | |
security.protocol | 用于与broker通讯的协议。
有效值为:PLAINTEXT,SSL,SASL_PLAINTEXT,SASL_SSL。 |
string | PLAINTEXT | 中 | |
send.buffer.bytes | 发送数据时,用于TCP发送缓存(SO_SNDBUF)的大小。
如果值为 -1,将默认使用系统的。 |
int | 131072 | [-1,...] | 中 |
ssl.enabled.protocols | 启用SSL连接的协议列表。 | list | TLSv1.2,TLSv1.1,TLSv1 | 中 | |
ssl.keystore.type | 密钥存储文件的文件格式。
对于客户端是可选的。 |
string | JKS | 中 | |
ssl.protocol | 最近的JVM中允许的值是TLS,TLSv1.1和TLSv1.2。
较旧的JVM可能支持SSL,SSLv2和SSLv3,但由于已知的安全漏洞,不建议使用SSL。 |
string | TLS | 中 | |
ssl.provider | 用于SSL连接的安全提供程序的名称。默认值是JVM的默认安全提供程序。 | string | null | 中 | |
ssl.truststore.type | 信任仓库文件的文件格式。 | string | JKS | 中 | |
enable.idempotence | 当设置为‘true’,生产者将确保每个消息正好一次复制写入到stream。如果‘false’,由于broker故障,生产者重试。
即,可以在流中写入重试的消息。此设置默认是‘false’。请注意,启用幂等式需要将max.in.flight.requests.per.connection设置为1,重试次数不能为零。 另外acks必须设置为“全部”。如果这些值保持默认值,我们将覆盖默认值。 如果这些值设置为与幂等生成器不兼容的值,则将抛出一个ConfigException异常。 如果这些值设置为与幂等生成器不兼容的值,则将抛出一个ConfigException异常。 |
boolean | false | 低 | |
interceptor.classes | 实现ProducerInterceptor接口,你可以在生产者发布到Kafka群集之前拦截(也可变更)生产者收到的消息。
默认情况下没有拦截器。 |
list | null | 低 | |
max.in.flight.requests.per.connection | 阻塞之前,客户端单个连接上发送的未应答请求的最大数量。
注意,如果此设置设置大于1且发送失败,则会由于重试(如果启用了重试)会导致消息重新排序的风险。 |
int | 5 | [1,...] | 低 |
metadata.max.age.ms | 在一段时间段之后(以毫秒为单位),强制更新元数据,即使我们没有看到任何分区leader的变化,也会主动去发现新的broker或分区。 | long | 300000 | [0,...] | 低 |
metric.reporters | 用作metrics reporters(指标记录员)的类的列表。
实现MetricReporter接口,将受到新增加的度量标准创建类插入的通知。 JmxReporter始终包含在注册JMX统计信息中。 |
list | "" | 低 | |
metrics.num.samples | 维护用于计算度量的样例数量。 | int | 2 | [1,...] | 低 |
metrics.recording.level | 指标的最高记录级别。 | string | INFO | [INFO, DEBUG] | 低 |
metrics.sample.window.ms | 度量样例计算上 | long | 30000 | [0,...] | 低 |
reconnect.backoff.max.ms | 重新连接到重复无法连接的代理程序时等待的最大时间(毫秒)。
如果提供,每个主机的回退将会连续增加,直到达到最大值。 计算后退增加后,增加20%的随机抖动以避免连接风暴。 |
long | 1000 | [0,...] | 低 |
reconnect.backoff.ms | 尝试重新连接到给定主机之前等待的基本时间量。这避免了在循环中高频率的重复连接到主机。这种回退适应于客户端对broker的所有连接尝试。 | long | 50 | [0,...] | 低 |
retry.backoff.ms | 尝试重试指定topic分区的失败请求之前等待的时间。
这样可以避免在某些故障情况下高频次的重复发送请求。 |
long | 100 | [0,...] | 低 |
sasl.kerberos.kinit.cmd | Kerberos kinit 命令路径。 | string | /usr/bin/kinit | 低 | |
sasl.kerberos.min.time.before.relogin | Login线程刷新尝试之间的休眠时间。 | long | 60000 | 低 | |
sasl.kerberos.ticket.renew.jitter | 添加更新时间的随机抖动百分比。 | double | 0.05 | 低 | |
sasl.kerberos.ticket.renew.window.factor | 登录线程将睡眠,直到从上次刷新ticket到期时间的指定窗口因子为止,此时将尝试续订ticket。 | double | 0.8 | 低 | |
ssl.cipher.suites | 密码套件列表。这是使用TLS或SSL网络协议来协商用于网络连接的安全设置的认证,加密,MAC和密钥交换算法的命名组合。默认情况下,支持所有可用的密码套件。 | list | null | 低 | |
ssl.endpoint.identification.algorithm | 使用服务器证书验证服务器主机名的端点识别算法。 | string | null | 低 | |
ssl.keymanager.algorithm | 用于SSL连接的密钥管理因子算法。默认值是为Java虚拟机配置的密钥管理器工厂算法。 | string | SunX509 | 低 | |
ssl.secure.random.implementation | 用于SSL加密操作的SecureRandom PRNG实现。 | string | null | 低 | |
ssl.trustmanager.algorithm | 用于SSL连接的信任管理因子算法。
默认值是JAVA虚拟机配置的信任管理工厂算法。 |
string | PKIX | 低 | |
transaction.timeout.ms | 生产者在主动中止正在进行的交易之前,交易协调器等待事务状态更新的最大时间(以ms为单位)。
如果此值大于broker中的max.transaction.timeout.ms设置,则请求将失败,并报“InvalidTransactionTimeout”错误。 |
int | 60000 | 低 | |
transactional.id | 用于事务传递的TransactionalId。
这样可以跨多个生产者会话的可靠性语义,因为它允许客户端保证在开始任何新事务之前使用相同的TransactionalId的事务已经完成。 如果没有提供TransactionalId,则生产者被限制为幂等传递。 请注意,如果配置了TransactionalId,则必须启用enable.idempotence。 默认值为空,这意味着无法使用事务。 |
string | null | non-empty string | 低 |
kafka >= 2.0.0 | |||||
sasl.client.callback.handler.class | 实现AuthenticateCallbackHandler接口的SASL客户端回调处理程序类的全称。 | class | null | 中间 | |
sasl.login.callback.handler.class | 实现AuthenticateCallbackHandler接口的SASL登录回调处理程序类的全称。
对于broker来说,登录回调处理程序配置必须以监听器前缀和小写的SASL机制名称为前缀。 例如,listener.name.sasl_ssl.scram-sha-256.sasl.login.callback.handler.class=com.example.CustomScramLoginCallbackHandler |
class | null | 中 | |
sasl.login.class | 实现Login接口的类的全称。
对于broker来说,login config必须以监听器前缀和SASL机制名称为前缀,并使用小写。 例如,listener.name.sasl_ssl.scram-sha-256.sasl.login.class=com.example.CustomScramLogin。 |
class | null | 中 | |
kafka >= 2.1.0 | |||||
client.dns.lookup | 控制客户端如何使用DNS查询。如果设置为 use_all_dns_ips,则依次连接到每个返回的IP地址,直到成功建立连接。断开连接后,使用下一个IP。
一旦所有的IP都被使用过一次,客户端就会再次从主机名中解析IP(s)(然而,JVM和操作系统都会缓存DNS名称查询)。如果设置为 resolve_canonical_bootstrap_servers_only,则将每个引导地址解析成一个canonical名称列表。在bootstrap阶段之后,这和use_all_dns_ips的行为是一样的。如果设置为 default(已弃用),则尝试连接到查找返回的第一个IP地址,即使查找返回多个IP地址。 |
string | use_all_dns_ips | [default, use_all_dns_ips, resolve_canonical_bootstrap_servers_only] | 中 |
delivery.timeout.ms | 调用send()返回后报告成功或失败的时间上限。
这限制了消息在发送前被延迟的总时间,等待broker确认的时间(如果期望的话),以及允许重试发送失败的时间。如果遇到不可恢复的错误,重试次数已经用尽,或者消息被添加到一个达到较早发送到期期限的批次中,生产者可能会报告未能在这个配置之前发送记录。 这个配置的值应该大于或等于request.timeout.ms和linger.ms之和。 |
int | 120000 (2 minutes) | [0,...] | 中 |
kafka >= 2.7 | |||||
ssl.truststore.certificates | 可信证书的格式由'ssl.truststore.type'指定。
默认的SSL引擎工厂只支持带X.509证书的PEM格式。 |
password | null | 高 | |
socket.connection.setup.timeout.max.ms | 客户端等待建立socket连接的最大时间。连接设置超时时间将随着每一次连续的连接失败而成倍增加,直到这个最大值。
为了避免连接风暴,超时时间将被应用一个0.2的随机因子,导致计算值在20%以下和20%以上的随机范围。 |
long | 127000 (127 seconds) | 中 | |
socket.connection.setup.timeout.ms | 客户端等待建立socket连接的时间。
如果在超时之前没有建立连接,客户端将关闭socket通道。 |
long | 10000 (10 seconds) | 中 |
Broker
基本配置如下:
- broker.id
- log.dirs
- zookeeper.connect
详细配置:
名称 | 描述 | 类型 | 默认 | 有效值 | 重要程度 |
---|---|---|---|---|---|
kafka >= 0.10版 | |||||
Broker
基本配置如下:
- broker.id
- log.dirs
- zookeeper.connect
详细配置:
名称 | 描述 | 类型 | 默认 | 有效值 | 重要程度 |
---|---|---|---|---|---|
kafka >= 0.10版 | |||||
Broker
基本配置如下:
- broker.id
- log.dirs
- zookeeper.connect
详细配置:
名称 | 描述 | 类型 | 默认 | 有效值 | 重要程度 |
---|---|---|---|---|---|
kafka >= 0.10版 | |||||
Broker
基本配置如下:
- broker.id
- log.dirs
- zookeeper.connect
详细配置:
名称 | 描述 | 类型 | 默认 | 有效值 | 重要程度 |
---|---|---|---|---|---|
kafka >= 0.10版 | |||||