“Kafka:安装、配置”的版本间差异
跳到导航
跳到搜索
(→集群搭建) |
|||
(未显示同一用户的10个中间版本) | |||
第200行: | 第200行: | ||
== 推荐配置 == | == 推荐配置 == | ||
最重要的 producer 配置控制: | |||
* 压缩 | |||
* 同步生产 vs 异步生产 | |||
* 批处理大小(异步生产) | |||
最重要的 consumer 配置: | |||
* 获取消息的大小 | |||
=== 生产者服务器配置 === | |||
服务器生产服务器配置: | |||
<syntaxhighlight lang="xml" highlight=""> | |||
# Replication configurations | |||
num.replica.fetchers=4 | |||
replica.fetch.max.bytes=1048576 | |||
replica.fetch.wait.max.ms=500 | |||
replica.high.watermark.checkpoint.interval.ms=5000 | |||
replica.socket.timeout.ms=30000 | |||
replica.socket.receive.buffer.bytes=65536 | |||
replica.lag.time.max.ms=10000 | |||
replica.lag.max.messages=4000 | |||
controller.socket.timeout.ms=30000 | |||
controller.message.queue.size=10 | |||
# Log configuration | |||
num.partitions=8 | |||
message.max.bytes=1000000 | |||
auto.create.topics.enable=true | |||
log.index.interval.bytes=4096 | |||
log.index.size.max.bytes=10485760 | |||
log.retention.hours=168 | |||
log.flush.interval.ms=10000 | |||
log.flush.interval.messages=20000 | |||
log.flush.scheduler.interval.ms=2000 | |||
log.roll.hours=168 | |||
log.retention.check.interval.ms=300000 | |||
log.segment.bytes=1073741824 | |||
# ZK configuration | |||
zookeeper.connection.timeout.ms=6000 | |||
zookeeper.sync.time.ms=2000 | |||
# Socket server configuration | |||
num.io.threads=8 | |||
num.network.threads=8 | |||
socket.request.max.bytes=104857600 | |||
socket.receive.buffer.bytes=1048576 | |||
socket.send.buffer.bytes=1048576 | |||
queued.max.requests=16 | |||
fetch.purgatory.purge.interval.requests=100 | |||
producer.purgatory.purge.interval.requests=100 | |||
</syntaxhighlight> | |||
新版本推荐: | |||
<syntaxhighlight lang="xml" highlight=""> | |||
# ZooKeeper | |||
zookeeper.connect=[list of ZooKeeper servers] | |||
# Log configuration | |||
num.partitions=8 | |||
default.replication.factor=3 | |||
log.dir=[List of directories. Kafka should have its own dedicated disk(s) or SSD(s).] | |||
# Other configurations | |||
broker.id=[An integer. Start with 0 and increment by 1 for each new broker.] | |||
listeners=[list of listeners] | |||
auto.create.topics.enable=false | |||
min.insync.replicas=2 | |||
queued.max.requests=[number of concurrent requests] | |||
</syntaxhighlight> | |||
=== Java版本 === | |||
<pre> | |||
从安全的角度,我们推荐你使用最新的发布版本 JDK 1.8,旧版本已经公开披露了一些安全漏洞,LinkedIn 现在正在运行的是JDK 1.8 u5(希望升级到新版本)使用 G1 收集器。 | |||
如果你想在在 JDK 1.7 使用G1收集器(当前默认),请确保在 u51 或更高的版本,LinkedIn 尝试在 u21 测试,但该版本存在大量 G1 执行的问题。 | |||
</pre> | |||
LinkedIn的配置如下: | |||
<syntaxhighlight lang="xml" highlight=""> | |||
-Xmx6g -Xms6g -XX:MetaspaceSize=96m -XX:+UseG1GC | |||
-XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M | |||
-XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80 | |||
</syntaxhighlight> | |||
供参考,下面是关于 LinkedIn 的繁忙集群 (高峰) 之一的统计: | |||
* 60 brokers | |||
* 50k 分区 (副本 2) | |||
* 800k 消息/秒 | |||
* 300 MB/sec的入站, 1 GB/秒+ 出站 | |||
这个调整看来相当激进, 但是集群中的有 90% 的 GC 暂停时间大约是 21ms, 以及每秒小于 1 个的年轻代 GC。 | |||
== 配置 == | == 配置 == | ||
第213行: | 第306行: | ||
详细配置: | 详细配置: | ||
{| class="wikitable mw-collapsible mw-collapsed" style="width: 100%" | {| class="wikitable mw-collapsible mw-collapsed" style="width: 100%" | ||
| | |- | ||
! style="width:20%;" | 名称 | ! style="width:20%;" | 名称 | ||
! style="width: | ! style="width:50%;" | 描述 | ||
! style="width:5%;" | 类型 | ! style="width:5%;" | 类型 | ||
! style="width:5%;" | 默认 | ! style="width:5%;" | 默认 | ||
! style="width:5%;" | 有效值 | ! style="width:5%;" | 有效值 | ||
! style="width: | ! style="width:6%;" | 重要程度 | ||
! style="width: | ! style="width:9%;" | 更新模式 | ||
|- | |- | ||
! colspan="7" | kafka >= 0.10版 | ! colspan="7" | kafka >= 0.10版 | ||
第958行: | 第1,051行: | ||
详细配置: | 详细配置: | ||
{| class="wikitable mw-collapsible mw-collapsed" style="width: 100%" | {| class="wikitable mw-collapsible mw-collapsed" style="width: 100%" | ||
| | |- | ||
! style="width:20%;" | 名称 | ! style="width:20%;" | 名称 | ||
! style="width: | ! style="width:45%;" | 描述 | ||
! style="width:5%;" | 类型 | ! style="width:5%;" | 类型 | ||
! style="width:5%;" | 默认 | ! style="width:5%;" | 默认 | ||
! style="width:5%;" | 有效值 | ! style="width:5%;" | 有效值 | ||
! style="width: | ! style="width:10%;" | 服务器默认属性 | ||
! style="width: | ! style="width:10%;" | 更新模式 | ||
|- | |- | ||
| cleanup.policy | | cleanup.policy | ||
第1,092行: | 第1,185行: | ||
详细配置: | 详细配置: | ||
{| class="wikitable mw-collapsible mw-collapsed" style="width: 100%" | {| class="wikitable mw-collapsible mw-collapsed" style="width: 100%" | ||
| | |- | ||
! style="width:20%;" | 名称 | ! style="width:20%;" | 名称 | ||
! style="width: | ! style="width:50%;" | 描述 | ||
! style="width:5%;" | 类型 | ! style="width:5%;" | 类型 | ||
! style="width:5%;" | 默认 | ! style="width:5%;" | 默认 | ||
! style="width:5%;" | 有效值 | ! style="width:5%;" | 有效值 | ||
! style="width: | ! style="width:10%;" | 重要程度 | ||
|- | |- | ||
| bootstrap.servers | | bootstrap.servers | ||
第1,416行: | 第1,509行: | ||
新消费者配置: | 新消费者配置: | ||
{| class="wikitable mw-collapsible mw-collapsed" style="width: 100%" | {| class="wikitable mw-collapsible mw-collapsed" style="width: 100%" | ||
| | |- | ||
! style="width:20%;" | 名称 | ! style="width:20%;" | 名称 | ||
! style="width: | ! style="width:50%;" | 描述 | ||
! style="width:5%;" | 类型 | ! style="width:5%;" | 类型 | ||
! style="width:5%;" | 默认 | ! style="width:5%;" | 默认 | ||
! style="width:5%;" | 有效值 | ! style="width:5%;" | 有效值 | ||
! style="width: | ! style="width:10%;" | 重要程度 | ||
|- | |- | ||
| bootstrap.servers | | bootstrap.servers | ||
第1,729行: | 第1,822行: | ||
* zookeeper.connect | * zookeeper.connect | ||
{| class="wikitable mw-collapsible mw-collapsed" style="width: 100%" | {| class="wikitable mw-collapsible mw-collapsed" style="width: 100%" | ||
| | |- | ||
! style="width:20%;" | 名称 | ! style="width:20%;" | 名称 | ||
! style="width:5%;" | 默认值 | ! style="width:5%;" | 默认值 | ||
第1,864行: | 第1,957行: | ||
Kafka Stream客户端库配置: | Kafka Stream客户端库配置: | ||
{| class="wikitable mw-collapsible mw-collapsed" style="width: 100%" | {| class="wikitable mw-collapsible mw-collapsed" style="width: 100%" | ||
| | |- | ||
! style="width:20%;" | 名称 | ! style="width:20%;" | 名称 | ||
! style="width: | ! style="width:50%;" | 描述 | ||
! style="width:5%;" | 类型 | ! style="width:5%;" | 类型 | ||
! style="width:5%;" | 默认 | ! style="width:5%;" | 默认 | ||
! style="width:5%;" | 有效值 | ! style="width:5%;" | 有效值 | ||
! style="width: | ! style="width:10%;" | 重要程度 | ||
|- | |- | ||
| application.id | | application.id | ||
第2,036行: | 第2,129行: | ||
Kafka Connect框架的相关配置: | Kafka Connect框架的相关配置: | ||
{| class="wikitable mw-collapsible mw-collapsed" style="width: 100%" | {| class="wikitable mw-collapsible mw-collapsed" style="width: 100%" | ||
| | |- | ||
! style="width:20%;" | 名称 | ! style="width:20%;" | 名称 | ||
! style="width: | ! style="width:50%;" | 描述 | ||
! style="width:5%;" | 类型 | ! style="width:5%;" | 类型 | ||
! style="width:5%;" | 默认 | ! style="width:5%;" | 默认 | ||
! style="width:5%;" | 有效值 | ! style="width:5%;" | 有效值 | ||
! style="width: | ! style="width:10%;" | 重要程度 | ||
|- | |- | ||
| config.storage.topic | | config.storage.topic | ||
第2,383行: | 第2,476行: | ||
Kafka Admin客户端的配置: | Kafka Admin客户端的配置: | ||
{| class="wikitable mw-collapsible mw-collapsed" style="width: 100%" | {| class="wikitable mw-collapsible mw-collapsed" style="width: 100%" | ||
| | |- | ||
! style="width:20%;" | 名称 | ! style="width:20%;" | 名称 | ||
! style="width: | ! style="width:50%;" | 描述 | ||
! style="width:5%;" | 类型 | ! style="width:5%;" | 类型 | ||
! style="width:5%;" | 默认 | ! style="width:5%;" | 默认 | ||
! style="width:5%;" | 有效值 | ! style="width:5%;" | 有效值 | ||
! style="width: | ! style="width:10%;" | 重要程度 | ||
|- | |- | ||
| bootstrap.servers | | bootstrap.servers |
2021年12月19日 (日) 22:06的最新版本
单机安装
Kafka 解压即用,并没有繁琐的安装步骤,唯一注意的是其需要 Zookeeper 支持(但其自带有 Zookeeper)。
- 官网下载压缩包:https://www.apache.org/dyn/closer.cgi?path=/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz
- 提取 tar 文件:
$ cd opt/ $ tar -zxf kafka_2.11.0.9.0.0 tar.gz
- 启动服务器:
$ cd kafka_2.11.0.9.0.0 $ bin/kafka-server-start.sh config/server.properties
- 服务器启动后,会得到以下响应:
$ bin/kafka-server-start.sh config/server.properties [2016-01-02 15:37:30,410] INFO KafkaConfig values: request.timeout.ms = 30000 log.roll.hours = 168 inter.broker.protocol.version = 0.9.0.X log.preallocate = false security.inter.broker.protocol = PLAINTEXT ……………………………………………. …………………………………………….
- 停止服务器:
$ bin/kafka-server-stop.sh config/server.properties
NOTE:
- 启动 kafka 需要执行 kafka-server-start.bat 文件,需要传入一个路径参数(server.config 文件的路径):
- 如果使用自行安装的 Zookeeper(先启动 Zookeeper,再启动 Kafka),使用“config/server.properties”。
- 如果使用 Kafka 自带的 Zookeeper(直接启动 Kafka 即可),使用:“config/zookeeper.properties”。
- properties 文件都位于“../kafka_x.xx.x.x.x.x/config”中。
使用
打开另一个命令终端启动kafka服务:
> bin/kafka-server-start.sh config/server.properties &
- 创建一个主题(topic):
- 创建一个名为“test”的Topic,只有一个分区和一个备份:
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
- 创建好之后,可以通过运行以下命令,查看已创建的topic信息:
> bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092 Topic:quickstart-events PartitionCount:1 ReplicationFactor:1 Configs: Topic: quickstart-events Partition: 0 Leader: 0 Replicas: 0 Isr: 0
- 或者,除了手工创建topic外,你也可以配置你的broker,当发布一个不存在的topic时会自动创建topic。
- 发送消息:
- Kafka提供了一个命令行的工具,可以从输入文件或者命令行中读取消息并发送给Kafka集群。每一行是一条消息。
- 运行 producer(生产者),然后在控制台输入几条消息到服务器:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test This is a message This is another message
- 消费消息:
- Kafka也提供了一个消费消息的命令行工具,将存储的信息输出出来,新打开一个命令控制台,输入:
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning This is a message This is another message
- 如果你有2台不同的终端上运行上述命令,那么当你在运行生产者时,消费者就能消费到生产者发送的消息。
- 使用 Kafka Connect 来 导入/导出 数据:
- 你可能在现有的系统中拥有大量的数据,如关系型数据库或传统的消息传递系统,以及许多已经使用这些系统的应用程序。Kafka Connect允许你不断地从外部系统提取数据到Kafka,反之亦然。用Kafka整合现有的系统是非常容易的。为了使这个过程更加容易,有数百个这样的连接器现成可用。
- 见:“Kafka:Connect”
- 使用Kafka Stream来处理数据:
- 一旦你的数据存储在Kafka中,你就可以用Kafka Streams客户端库来处理这些数据,该库适用于Java/Scala。它允许你实现自己的实时应用程序和微服务,其中输入和/或输出数据存储在Kafka主题中。Kafka Streams将在客户端编写和部署标准Java和Scala应用程序的简单性与Kafka服务器端集群技术的优势相结合,使这些应用程序具有可扩展性、弹性、容错性和分布式。该库支持精确的一次性处理、有状态操作和聚合、窗口化、连接、基于事件时间的处理等等。
- 实现流行的 WordCount 算法【???】:
KStream<String, String> textLines = builder.stream("quickstart-events"); KTable<String, Long> wordCounts = textLines .flatMapValues(line -> Arrays.asList(line.toLowerCase().split(" "))) .groupBy((keyIgnored, word) -> word) .count(); wordCounts.toStream().to("output-topic", Produced.with(Serdes.String(), Serdes.Long()));
- 停止Kafka:
- 使用 Ctrl-C 停止生产者和消费者客户端。
- 使用 Ctrl-C 停止 Kafka broker。
- 最后,使用 Ctrl-C 停止 ZooKeeper。
- 如果你还想删除你的本地Kafka环境的数据,包括你创建的消息,运行命令。
$ rm -rf /tmp/kafka-logs /tmp/zookeeper
集群搭建
背景:如安装所示,已在集群机器上安装了 Kafka。
设置多个broker集群:
- 为每个broker创建一个配置文件:
> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties
- 为不同broker修改配置文件:
config/server-1.properties: broker.id=1 listeners=PLAINTEXT://:9093 log.dir=/tmp/kafka-logs-1
config/server-2.properties: broker.id=2 listeners=PLAINTEXT://:9094 log.dir=/tmp/kafka-logs-2
- broker.id 是集群中每个节点的唯一且永久的名称。
- 【修改端口和日志目录是因为,现在在同一台机器上运行,要防止broker在同一端口上注册和覆盖对方的数据】
- 在启动新的kafka节点:
> bin/kafka-server-start.sh config/server-1.properties & ...
> bin/kafka-server-start.sh config/server-2.properties & ...
使用
- 创建一个新topic:
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
- 命令“describe topics”,查看集群:
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: my-replicated-topic Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
- 其中:第一行是所有分区的摘要,其次,每一行提供一个分区信息(因为我们只有一个分区,所以只有一行)。
- “Leader”:该节点负责该分区的所有的读和写(每个节点的leader都是随机选择的)。
- “Replicas”:备份的节点列表,无论该节点是否是leader或者目前是否还活着,只是显示。
- “Isr”:“同步备份”的节点列表,也就是活着的节点并且正在同步leader。
- 发布信息:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic ... my test message 1 my test message 2 ^C
- 消费消息:
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic ... my test message 1 my test message 2 ^C
- 集群容错测试:
- kill掉leader,Broker1作为当前的leader,也就是kill掉Broker1。
> ps | grep server-1.properties 7564 ttys002 0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.6/Home/bin/java... > kill -9 7564
- 在Windows上使用:
> wmic process where "caption = 'java.exe' and commandline like '%server-1.properties%'" get processid ProcessId 6016 > taskkill /pid 6016 /f
- 命令“describe topics”,查看集群:
> binbin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic Topic:my-replicated-topic PartitionCount:1 ReplicationFactor:3 Configs: Topic: my-replicated-topic Partition: 0 Leader: 2 Replicas: 1,2,0 Isr: 2,0
- 备份节点之一成为新的leader,而broker1已经不在同步备份集合里了。
- 消费消息:
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic my-replicated-topic ... my test message 1 my test message 2 ^C
- 如上,消息仍然没丢。
- kill掉leader,Broker1作为当前的leader,也就是kill掉Broker1。
推荐配置
最重要的 producer 配置控制:
- 压缩
- 同步生产 vs 异步生产
- 批处理大小(异步生产)
最重要的 consumer 配置:
- 获取消息的大小
生产者服务器配置
服务器生产服务器配置:
# Replication configurations
num.replica.fetchers=4
replica.fetch.max.bytes=1048576
replica.fetch.wait.max.ms=500
replica.high.watermark.checkpoint.interval.ms=5000
replica.socket.timeout.ms=30000
replica.socket.receive.buffer.bytes=65536
replica.lag.time.max.ms=10000
replica.lag.max.messages=4000
controller.socket.timeout.ms=30000
controller.message.queue.size=10
# Log configuration
num.partitions=8
message.max.bytes=1000000
auto.create.topics.enable=true
log.index.interval.bytes=4096
log.index.size.max.bytes=10485760
log.retention.hours=168
log.flush.interval.ms=10000
log.flush.interval.messages=20000
log.flush.scheduler.interval.ms=2000
log.roll.hours=168
log.retention.check.interval.ms=300000
log.segment.bytes=1073741824
# ZK configuration
zookeeper.connection.timeout.ms=6000
zookeeper.sync.time.ms=2000
# Socket server configuration
num.io.threads=8
num.network.threads=8
socket.request.max.bytes=104857600
socket.receive.buffer.bytes=1048576
socket.send.buffer.bytes=1048576
queued.max.requests=16
fetch.purgatory.purge.interval.requests=100
producer.purgatory.purge.interval.requests=100
新版本推荐:
# ZooKeeper
zookeeper.connect=[list of ZooKeeper servers]
# Log configuration
num.partitions=8
default.replication.factor=3
log.dir=[List of directories. Kafka should have its own dedicated disk(s) or SSD(s).]
# Other configurations
broker.id=[An integer. Start with 0 and increment by 1 for each new broker.]
listeners=[list of listeners]
auto.create.topics.enable=false
min.insync.replicas=2
queued.max.requests=[number of concurrent requests]
Java版本
从安全的角度,我们推荐你使用最新的发布版本 JDK 1.8,旧版本已经公开披露了一些安全漏洞,LinkedIn 现在正在运行的是JDK 1.8 u5(希望升级到新版本)使用 G1 收集器。 如果你想在在 JDK 1.7 使用G1收集器(当前默认),请确保在 u51 或更高的版本,LinkedIn 尝试在 u21 测试,但该版本存在大量 G1 执行的问题。
LinkedIn的配置如下:
-Xmx6g -Xms6g -XX:MetaspaceSize=96m -XX:+UseG1GC
-XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M
-XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80
供参考,下面是关于 LinkedIn 的繁忙集群 (高峰) 之一的统计:
- 60 brokers
- 50k 分区 (副本 2)
- 800k 消息/秒
- 300 MB/sec的入站, 1 GB/秒+ 出站
这个调整看来相当激进, 但是集群中的有 90% 的 GC 暂停时间大约是 21ms, 以及每秒小于 1 个的年轻代 GC。
配置
Broker
基本配置如下:
- broker.id
- log.dirs
- zookeeper.connect
详细配置:
名称 | 描述 | 类型 | 默认 | 有效值 | 重要程度 | 展开更新模式 |
---|
Topic
与topic相关的配置,服务器的默认值,也可选择的覆盖指定的topic。
- 如果没有给出指定topic的配置,则将使用服务器默认值。
- 可以通过“-config”选项在topic创建时设置。
此示例使用自定义最大消息的大小和刷新率,创建一个名为 my-topic 的topic:
> bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic my-topic --partitions 1
--replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1
也可以使用“alter configs”命令修改或设置。 此示例修改更新 my-topic 的最大的消息大小:
> bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic
--alter --add-config max.message.bytes=128000
可以执行以下命令验证结果:
> bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic --describe
移除设置:
> bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic --alter --delete-config max.message.bytes
详细配置:
名称 | 描述 | 类型 | 默认 | 有效值 | 服务器默认属性 | 展开更新模式 |
---|
Producer
详细配置:
名称 | 描述 | 类型 | 默认 | 有效值 | 展开重要程度 |
---|
Consumer
在0.9.0.0中,我们引入了新的Java消费者来替代早期基于Scala的简单和高级消费者。新老客户端的配置如下。
新消费者配置:
名称 | 描述 | 类型 | 默认 | 有效值 | 展开重要程度 |
---|
旧消费者配置:
- group.id
- zookeeper.connect
名称 | 默认值 | 展开描述 |
---|
Streams
Kafka Stream客户端库配置:
名称 | 描述 | 类型 | 默认 | 有效值 | 展开重要程度 |
---|
Connect
Kafka Connect框架的相关配置:
名称 | 描述 | 类型 | 默认 | 有效值 | 展开重要程度 |
---|
AdminClient
Kafka Admin客户端的配置:
名称 | 描述 | 类型 | 默认 | 有效值 | 展开重要程度 |
---|