“Kafka:基本操作”的版本间差异

来自Wikioe
跳到导航 跳到搜索
(建立内容为“category:Kafka == 关于 == 首先开始实现“单节点单代理”配置,然后将设置迁移到“单节点多代理”配置。 * 先安装 Java,Z…”的新页面)
 
无编辑摘要
第1行: 第1行:
[[category:Kafka]]
[[category:Kafka]]


== 关于 ==
== 添加、修改、删除 topic ==
首先开始实现“单节点单代理”配置,然后将设置迁移到“单节点多代理”配置。
如果'''第一次发布一个不存在的topic时,它会自动创建'''。也可以手动添加topic。
* 先安装 Java,ZooKeeper 和 Kafka 。
* 在迁移到 Kafka Cluster Setup 之前,首先需要启动 ZooKeeper,因为 Kafka Cluster 使用 ZooKeeper。




# 添加、修改 topic:
<syntaxhighlight lang="bash" highlight="">
> bin/kafka-topics.sh --zookeeper zk_host:port/chroot --create --topic my_topic_name
      --partitions 20 --replication-factor 3 --config x=y
</syntaxhighlight>
#* 副本控制每个消息在服务器中的备份。
#*: 如果有3个副本,那么最多允许有 2 个节点宕掉才能不丢数据,'''集群中推荐设置 2 或 3 个副本''',才不会中断数据消费。
#* 分区数控制 topic 将分片成多少 log。
#*: 关于分区数的影响,首先每个分区必须完整的存储在单个的服务器上。因此,如果你有20个分区的话(读和写的负载),那么完整的数据集将不超过20个服务器(不计算备份)。最后,分区数影响消费者的最大并发。
#* 命令行上添加的配置覆盖了服务器的默认设置,服务器有关于时间长度的数据,应该保留。
# 更改topic的配置和分区:
## kafka版本 < 2.2
##: <syntaxhighlight lang="bash" highlight="">
> bin/kafka-topics.sh --zookeeper zk_host:port/chroot --create --topic my_topic_name
        --partitions 20 --replication-factor 3 --config x=y
</syntaxhighlight>
## kafka版本 >= 2.2
##: '''<syntaxhighlight lang="bash" highlight="">
> bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic my_topic_name \
        --partitions 40
</syntaxhighlight>'''
#* 请注意,分区的一种用例是在语义上对数据进行分区,添加分区不会更改现有数据的分区,因此如果消费者依赖该分区,可能会打扰消费者。
#*: 就是说,如果数据是通过“hash(key) % number_of_partition”进行分区划分的,那么该分区可能会因为添加分区而被搅乱,但是Kafka不会尝试以任何方式自动重新分发数据。
# 添加配置:
#: <syntaxhighlight lang="bash" highlight="">
> bin/kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics --entity-name my_topic_name --alter --add-config x=y
</syntaxhighlight>
# 移除配置:
#: <syntaxhighlight lang="bash" highlight="">
> bin/kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics --entity-name my_topic_name --alter --delete-config x
</syntaxhighlight>
# 删除 topic:
#: <syntaxhighlight lang="bash" highlight="">
> bin/kafka-topics.sh --bootstrap-server broker_host:port --delete --topic my_topic_name
</syntaxhighlight>
#* topic 删除选项默认是关闭的,设置服务器配置开启它。
#*: <syntaxhighlight lang="bash" highlight="">
delete.topic.enable=true
</syntaxhighlight>




* Kafka 目前不支持减少分区数和改变备份数,但是可以通过迁移脚本来实现【???】。


== 优雅地关闭Kafka ==
<pre>
Kafka集群将自动检测任何代理关闭或故障,并为该计算机上的分区选举新的领导者。


无论服务器发生故障还是为了维护或更改配置而故意关闭,都会发生这种情况。


对于后一种情况,Kafka支持一种更优雅的机制,即先停止服务器,然后再终止它。
</pre>


当服务器正常停止时,它将利用两种优化:
# 它将所有日志同步到磁盘上,以避免重新启动时需要进行任何日志恢复(即验证日志尾部所有消息的校验和)。 日志恢复需要时间,因此可以加快有意重启的速度。
# 它将在关闭服务器之前将服务器所领导的所有分区迁移到其他副本。 这将使领导层转移更快,并将每个分区不可用的时间减少到几毫秒。




只要服务器停止运行(不是通过强行终止),都会自动同步日志,但受控的领导层迁移需要使用特殊设置:
<syntaxhighlight lang="xml" highlight="">
controlled.shutdown.enable=true
</syntaxhighlight>
* 注意:只有在 broker 上托管的所有分区都有副本的情况下(即复制因子大于 1 且至少有一个副本处于活动状态),受控关闭才会成功。
*: 这通常是您想要的,因为关闭最后一个副本会使该主题分区不可用。


== 平衡leader ==
当一个 broker 停止或崩溃时,这个 broker 中所有分区的 leader 将转移给其他副本。
* 这意味着:在默认情况下,当这个'''broker重新启动之后,它的所有分区都将仅作为follower''',不再用于客户端的读写操作。


为了避免这种不平衡,Kafka 提出了“'''首选副本'''”的概念:
: 如果分区的副本列表为1、5、9,则节点1将优先作为其他两个副本5和9的 Leader,因为它在副本列表中较早。




您可以让 Kafka群集通过运行以下命令,尝试将已恢复的副本恢复为 Leader :
<syntaxhighlight lang="bash" highlight="">
# kafka版本 <= 2.4
> bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot


<syntaxhighlight lang="java" highlight="">
# kafka新版本
> bin/kafka-preferred-replica-election.sh --bootstrap-server broker_host:port
</syntaxhighlight>
手动运行很无趣,你可以通过这个配置设置为自动执行:
<syntaxhighlight lang="xml" highlight="">
auto.leader.rebalance.enable=true
</syntaxhighlight>
 
=== 镜像集群之间的数据 ===
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
#: <syntaxhighlight lang="bash" highlight="">


</syntaxhighlight>
</syntaxhighlight>

2021年5月20日 (四) 02:33的版本


添加、修改、删除 topic

如果第一次发布一个不存在的topic时,它会自动创建。也可以手动添加topic。


  1. 添加、修改 topic:
 > bin/kafka-topics.sh --zookeeper zk_host:port/chroot --create --topic my_topic_name 
       --partitions 20 --replication-factor 3 --config x=y
    • 副本控制每个消息在服务器中的备份。
      如果有3个副本,那么最多允许有 2 个节点宕掉才能不丢数据,集群中推荐设置 2 或 3 个副本,才不会中断数据消费。
    • 分区数控制 topic 将分片成多少 log。
      关于分区数的影响,首先每个分区必须完整的存储在单个的服务器上。因此,如果你有20个分区的话(读和写的负载),那么完整的数据集将不超过20个服务器(不计算备份)。最后,分区数影响消费者的最大并发。
    • 命令行上添加的配置覆盖了服务器的默认设置,服务器有关于时间长度的数据,应该保留。
  1. 更改topic的配置和分区:
    1. kafka版本 < 2.2
      > bin/kafka-topics.sh --zookeeper zk_host:port/chroot --create --topic my_topic_name
              --partitions 20 --replication-factor 3 --config x=y
      
    2. kafka版本 >= 2.2
      > bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic my_topic_name \
              --partitions 40
      
    • 请注意,分区的一种用例是在语义上对数据进行分区,添加分区不会更改现有数据的分区,因此如果消费者依赖该分区,可能会打扰消费者。
      就是说,如果数据是通过“hash(key) % number_of_partition”进行分区划分的,那么该分区可能会因为添加分区而被搅乱,但是Kafka不会尝试以任何方式自动重新分发数据。
  2. 添加配置:
    > bin/kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics --entity-name my_topic_name --alter --add-config x=y
    
  3. 移除配置:
    > bin/kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics --entity-name my_topic_name --alter --delete-config x
    
  4. 删除 topic:
    > bin/kafka-topics.sh --bootstrap-server broker_host:port --delete --topic my_topic_name
    
    • topic 删除选项默认是关闭的,设置服务器配置开启它。
      delete.topic.enable=true
      


  • Kafka 目前不支持减少分区数和改变备份数,但是可以通过迁移脚本来实现【???】。

优雅地关闭Kafka

Kafka集群将自动检测任何代理关闭或故障,并为该计算机上的分区选举新的领导者。

无论服务器发生故障还是为了维护或更改配置而故意关闭,都会发生这种情况。

对于后一种情况,Kafka支持一种更优雅的机制,即先停止服务器,然后再终止它。

当服务器正常停止时,它将利用两种优化:

  1. 它将所有日志同步到磁盘上,以避免重新启动时需要进行任何日志恢复(即验证日志尾部所有消息的校验和)。 日志恢复需要时间,因此可以加快有意重启的速度。
  2. 它将在关闭服务器之前将服务器所领导的所有分区迁移到其他副本。 这将使领导层转移更快,并将每个分区不可用的时间减少到几毫秒。


只要服务器停止运行(不是通过强行终止),都会自动同步日志,但受控的领导层迁移需要使用特殊设置:

controlled.shutdown.enable=true
  • 注意:只有在 broker 上托管的所有分区都有副本的情况下(即复制因子大于 1 且至少有一个副本处于活动状态),受控关闭才会成功。
    这通常是您想要的,因为关闭最后一个副本会使该主题分区不可用。

平衡leader

当一个 broker 停止或崩溃时,这个 broker 中所有分区的 leader 将转移给其他副本。

  • 这意味着:在默认情况下,当这个broker重新启动之后,它的所有分区都将仅作为follower,不再用于客户端的读写操作。

为了避免这种不平衡,Kafka 提出了“首选副本”的概念:

如果分区的副本列表为1、5、9,则节点1将优先作为其他两个副本5和9的 Leader,因为它在副本列表中较早。


您可以让 Kafka群集通过运行以下命令,尝试将已恢复的副本恢复为 Leader :

# kafka版本 <= 2.4
> bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot

# kafka新版本
> bin/kafka-preferred-replica-election.sh --bootstrap-server broker_host:port

手动运行很无趣,你可以通过这个配置设置为自动执行:

auto.leader.rebalance.enable=true

镜像集群之间的数据