“Kafka:基本操作”的版本间差异
跳到导航
跳到搜索
(建立内容为“category:Kafka == 关于 == 首先开始实现“单节点单代理”配置,然后将设置迁移到“单节点多代理”配置。 * 先安装 Java,Z…”的新页面) |
无编辑摘要 |
||
第1行: | 第1行: | ||
[[category:Kafka]] | [[category:Kafka]] | ||
== | == 添加、修改、删除 topic == | ||
如果'''第一次发布一个不存在的topic时,它会自动创建'''。也可以手动添加topic。 | |||
# 添加、修改 topic: | |||
<syntaxhighlight lang="bash" highlight=""> | |||
> bin/kafka-topics.sh --zookeeper zk_host:port/chroot --create --topic my_topic_name | |||
--partitions 20 --replication-factor 3 --config x=y | |||
</syntaxhighlight> | |||
#* 副本控制每个消息在服务器中的备份。 | |||
#*: 如果有3个副本,那么最多允许有 2 个节点宕掉才能不丢数据,'''集群中推荐设置 2 或 3 个副本''',才不会中断数据消费。 | |||
#* 分区数控制 topic 将分片成多少 log。 | |||
#*: 关于分区数的影响,首先每个分区必须完整的存储在单个的服务器上。因此,如果你有20个分区的话(读和写的负载),那么完整的数据集将不超过20个服务器(不计算备份)。最后,分区数影响消费者的最大并发。 | |||
#* 命令行上添加的配置覆盖了服务器的默认设置,服务器有关于时间长度的数据,应该保留。 | |||
# 更改topic的配置和分区: | |||
## kafka版本 < 2.2 | |||
##: <syntaxhighlight lang="bash" highlight=""> | |||
> bin/kafka-topics.sh --zookeeper zk_host:port/chroot --create --topic my_topic_name | |||
--partitions 20 --replication-factor 3 --config x=y | |||
</syntaxhighlight> | |||
## kafka版本 >= 2.2 | |||
##: '''<syntaxhighlight lang="bash" highlight=""> | |||
> bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic my_topic_name \ | |||
--partitions 40 | |||
</syntaxhighlight>''' | |||
#* 请注意,分区的一种用例是在语义上对数据进行分区,添加分区不会更改现有数据的分区,因此如果消费者依赖该分区,可能会打扰消费者。 | |||
#*: 就是说,如果数据是通过“hash(key) % number_of_partition”进行分区划分的,那么该分区可能会因为添加分区而被搅乱,但是Kafka不会尝试以任何方式自动重新分发数据。 | |||
# 添加配置: | |||
#: <syntaxhighlight lang="bash" highlight=""> | |||
> bin/kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics --entity-name my_topic_name --alter --add-config x=y | |||
</syntaxhighlight> | |||
# 移除配置: | |||
#: <syntaxhighlight lang="bash" highlight=""> | |||
> bin/kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics --entity-name my_topic_name --alter --delete-config x | |||
</syntaxhighlight> | |||
# 删除 topic: | |||
#: <syntaxhighlight lang="bash" highlight=""> | |||
> bin/kafka-topics.sh --bootstrap-server broker_host:port --delete --topic my_topic_name | |||
</syntaxhighlight> | |||
#* topic 删除选项默认是关闭的,设置服务器配置开启它。 | |||
#*: <syntaxhighlight lang="bash" highlight=""> | |||
delete.topic.enable=true | |||
</syntaxhighlight> | |||
* Kafka 目前不支持减少分区数和改变备份数,但是可以通过迁移脚本来实现【???】。 | |||
== 优雅地关闭Kafka == | |||
<pre> | |||
Kafka集群将自动检测任何代理关闭或故障,并为该计算机上的分区选举新的领导者。 | |||
无论服务器发生故障还是为了维护或更改配置而故意关闭,都会发生这种情况。 | |||
对于后一种情况,Kafka支持一种更优雅的机制,即先停止服务器,然后再终止它。 | |||
</pre> | |||
当服务器正常停止时,它将利用两种优化: | |||
# 它将所有日志同步到磁盘上,以避免重新启动时需要进行任何日志恢复(即验证日志尾部所有消息的校验和)。 日志恢复需要时间,因此可以加快有意重启的速度。 | |||
# 它将在关闭服务器之前将服务器所领导的所有分区迁移到其他副本。 这将使领导层转移更快,并将每个分区不可用的时间减少到几毫秒。 | |||
只要服务器停止运行(不是通过强行终止),都会自动同步日志,但受控的领导层迁移需要使用特殊设置: | |||
<syntaxhighlight lang="xml" highlight=""> | |||
controlled.shutdown.enable=true | |||
</syntaxhighlight> | |||
* 注意:只有在 broker 上托管的所有分区都有副本的情况下(即复制因子大于 1 且至少有一个副本处于活动状态),受控关闭才会成功。 | |||
*: 这通常是您想要的,因为关闭最后一个副本会使该主题分区不可用。 | |||
== 平衡leader == | |||
当一个 broker 停止或崩溃时,这个 broker 中所有分区的 leader 将转移给其他副本。 | |||
* 这意味着:在默认情况下,当这个'''broker重新启动之后,它的所有分区都将仅作为follower''',不再用于客户端的读写操作。 | |||
为了避免这种不平衡,Kafka 提出了“'''首选副本'''”的概念: | |||
: 如果分区的副本列表为1、5、9,则节点1将优先作为其他两个副本5和9的 Leader,因为它在副本列表中较早。 | |||
您可以让 Kafka群集通过运行以下命令,尝试将已恢复的副本恢复为 Leader : | |||
<syntaxhighlight lang="bash" highlight=""> | |||
# kafka版本 <= 2.4 | |||
> bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot | |||
<syntaxhighlight lang=" | # kafka新版本 | ||
> bin/kafka-preferred-replica-election.sh --bootstrap-server broker_host:port | |||
</syntaxhighlight> | |||
手动运行很无趣,你可以通过这个配置设置为自动执行: | |||
<syntaxhighlight lang="xml" highlight=""> | |||
auto.leader.rebalance.enable=true | |||
</syntaxhighlight> | |||
=== 镜像集群之间的数据 === | |||
#: <syntaxhighlight lang="bash" highlight=""> | |||
</syntaxhighlight> | </syntaxhighlight> |
2021年5月20日 (四) 02:33的版本
添加、修改、删除 topic
如果第一次发布一个不存在的topic时,它会自动创建。也可以手动添加topic。
- 添加、修改 topic:
> bin/kafka-topics.sh --zookeeper zk_host:port/chroot --create --topic my_topic_name
--partitions 20 --replication-factor 3 --config x=y
- 副本控制每个消息在服务器中的备份。
- 如果有3个副本,那么最多允许有 2 个节点宕掉才能不丢数据,集群中推荐设置 2 或 3 个副本,才不会中断数据消费。
- 分区数控制 topic 将分片成多少 log。
- 关于分区数的影响,首先每个分区必须完整的存储在单个的服务器上。因此,如果你有20个分区的话(读和写的负载),那么完整的数据集将不超过20个服务器(不计算备份)。最后,分区数影响消费者的最大并发。
- 命令行上添加的配置覆盖了服务器的默认设置,服务器有关于时间长度的数据,应该保留。
- 副本控制每个消息在服务器中的备份。
- 更改topic的配置和分区:
- kafka版本 < 2.2
> bin/kafka-topics.sh --zookeeper zk_host:port/chroot --create --topic my_topic_name --partitions 20 --replication-factor 3 --config x=y
- kafka版本 >= 2.2
> bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic my_topic_name \ --partitions 40
- 请注意,分区的一种用例是在语义上对数据进行分区,添加分区不会更改现有数据的分区,因此如果消费者依赖该分区,可能会打扰消费者。
- 就是说,如果数据是通过“hash(key) % number_of_partition”进行分区划分的,那么该分区可能会因为添加分区而被搅乱,但是Kafka不会尝试以任何方式自动重新分发数据。
- kafka版本 < 2.2
- 添加配置:
> bin/kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics --entity-name my_topic_name --alter --add-config x=y
- 移除配置:
> bin/kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics --entity-name my_topic_name --alter --delete-config x
- 删除 topic:
> bin/kafka-topics.sh --bootstrap-server broker_host:port --delete --topic my_topic_name
- topic 删除选项默认是关闭的,设置服务器配置开启它。
delete.topic.enable=true
- Kafka 目前不支持减少分区数和改变备份数,但是可以通过迁移脚本来实现【???】。
优雅地关闭Kafka
Kafka集群将自动检测任何代理关闭或故障,并为该计算机上的分区选举新的领导者。 无论服务器发生故障还是为了维护或更改配置而故意关闭,都会发生这种情况。 对于后一种情况,Kafka支持一种更优雅的机制,即先停止服务器,然后再终止它。
当服务器正常停止时,它将利用两种优化:
- 它将所有日志同步到磁盘上,以避免重新启动时需要进行任何日志恢复(即验证日志尾部所有消息的校验和)。 日志恢复需要时间,因此可以加快有意重启的速度。
- 它将在关闭服务器之前将服务器所领导的所有分区迁移到其他副本。 这将使领导层转移更快,并将每个分区不可用的时间减少到几毫秒。
只要服务器停止运行(不是通过强行终止),都会自动同步日志,但受控的领导层迁移需要使用特殊设置:
controlled.shutdown.enable=true
- 注意:只有在 broker 上托管的所有分区都有副本的情况下(即复制因子大于 1 且至少有一个副本处于活动状态),受控关闭才会成功。
- 这通常是您想要的,因为关闭最后一个副本会使该主题分区不可用。
平衡leader
当一个 broker 停止或崩溃时,这个 broker 中所有分区的 leader 将转移给其他副本。
- 这意味着:在默认情况下,当这个broker重新启动之后,它的所有分区都将仅作为follower,不再用于客户端的读写操作。
为了避免这种不平衡,Kafka 提出了“首选副本”的概念:
- 如果分区的副本列表为1、5、9,则节点1将优先作为其他两个副本5和9的 Leader,因为它在副本列表中较早。
您可以让 Kafka群集通过运行以下命令,尝试将已恢复的副本恢复为 Leader :
# kafka版本 <= 2.4
> bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot
# kafka新版本
> bin/kafka-preferred-replica-election.sh --bootstrap-server broker_host:port
手动运行很无趣,你可以通过这个配置设置为自动执行:
auto.leader.rebalance.enable=true