“Kafka:基本操作”的版本间差异

来自Wikioe
跳到导航 跳到搜索
无编辑摘要
第90行: 第90行:
</syntaxhighlight>
</syntaxhighlight>


=== 镜像集群之间的数据 ===
== 镜像集群之间的数据 ==
我们指的是kafka集群之间复制数据“镜像”,为避免在单个集群中的节点之间发生复制混乱的。




kafka附带了kafka集群之间的镜像数据的工具。该工具'''从一个源集群读取和写入到目标集群''',像这样:
: [[File:Kafka:镜像集群之间的数据.png|400px]]


常见的用例是镜像在另一个数据中心提供一个副本。
* 你可以运行很多这样的镜像进程来提高吞吐和容错性(如果某个进程挂了,则其他的进程会接管)
* 数据从源集群中的topic读取并将其写入到目标集群中相名的topic。事实上,镜像制作不比消费者和生产者连接要好。
* 源和目标集群是完全独立的实体:分区数和offset可以都不相同,就是因为这个原因,镜像集群并不是真的打算作为一个容错机制(消费者位置是不同的),为此,我们推荐使用正常的集群复制。然而,镜像制造将保留和使用分区的消息key,以便每个键基础上保存顺序。




示例,从两个输入集群镜像到一个topic(名为:my-topic):
<syntaxhighlight lang="bash" highlight="">
> bin/kafka-run-class.sh kafka.tools.MirrorMaker
      --consumer.config consumer-1.properties --consumer.config consumer-2.properties
      --producer.config producer.properties --whitelist my-topic
</syntaxhighlight>
* 注意,我们用 --whitelist 选项指定topic列表。此选项允许使用java风格的正则表达式。
*: 所以你可以使用 --whitelist 'A|B' ,A和B是镜像名。或者你可以镜像所有topic。也可以使用--whitelist ‘*’镜像所有topic,为了确保引用的正则表达式不会被shell认为是一个文件路径,我们允许使用‘,’ 而不是‘|’指定topic列表。
* 你可以很容易的排除哪些是不需要的,可以用--blacklist来排除,目前--new.consumer不支持。
* 镜像结合配置 auto.create.topics.enable=true,这样副本集群就会自动创建和复制。


== 检查消费者位置 ==
有时候需要去查看你的消费者的位置。




我们有一个显示“消费者组中”所有消费者的位置的工具,显示日志其落后多远。如下:
: 消费者组名为my-group,消费者topic名为my-topic,
<syntaxhighlight lang="bash" highlight="">
> bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect localhost:2181 --group test


Group          Topic                          Pid Offset          logSize        Lag            Owner
my-group        my-topic                      0  0              0              0              test_jkreps-mn-1394154511599-60744496-0
my-group        my-topic                      1  0              0              0              test_jkreps-mn-1394154521217-1a0be913-0
</syntaxhighlight>
* 注意:在0.9.0.0,kafka.tools.ConsumerOffsetChecker 已经不支持了。你应该使用 '''kafka.admin.ConsumerGroupCommand''' 或 '''bin/kafka-consumer-groups.sh'''  脚本来管理消费者组,包括用新消费者API创建的消费者。
*: <syntaxhighlight lang="bash" highlight="">
## 0.9+
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group test-consumer-group


## 0.10+
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
</syntaxhighlight>


=== Managing Consumer Groups(管理消费者组) ===
用 '''ConsumerGroupCommand''' 工具,我们可以使用 '''list''','''describe''',或 '''delete''' 消费者组(注意,删除只有在分组元数据存储在zookeeper的才可用)。
* 当使用新消费者API(broker协调处理分区和重新平衡),当该组的最后一个提交的偏移到期时,该组被删除。




示例:
# 要列出所有主题中的所有用户组:
#: <syntaxhighlight lang="bash" highlight="">
> bin/kafka-consumer-groups.sh --bootstrap-server broker1:9092 --list


test-consumer-group
</syntaxhighlight>
# 要像前面的示例中那样使用ConsumerOffsetChecker查看偏移量,可以这样“describe”消费者组:
#: <syntaxhighlight lang="bash" highlight="">
> bin/kafka-consumer-groups.sh --bootstrap-server broker1:9092 --describe --group test-consumer-group


GROUP                  TOPIC                      PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG            OWNER
test-consumer-group        test-foo                      0          1          3      2              consumer-1_/127.0.0.1
</syntaxhighlight>




#: <syntaxhighlight lang="bash" highlight="">
还有一切其他的命令可以提供消费组更多详细信息:
* '''-members''':此选项提供使用者组中所有活动成员的列表。
*: <syntaxhighlight lang="bash" highlight="">
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members
 
CONSUMER-ID                                    HOST            CLIENT-ID      #PARTITIONS
consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1      consumer1      2
consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1      consumer4      1
consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1      consumer2      3
consumer3-ecea43e4-1f01-479f-8349-f9130b75d8ee /127.0.0.1      consumer3      0
</syntaxhighlight>
* '''--members --verbose''':除了上述“ --members”选项报告的信息之外,此选项还提供分配给每个成员的分区
*: <syntaxhighlight lang="bash" highlight="">
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members --verbose
 
CONSUMER-ID                                    HOST            CLIENT-ID      #PARTITIONS    ASSIGNMENT
consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1      consumer1      2              topic1(0), topic2(0)
consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1      consumer4      1              topic3(2)
consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1      consumer2      3              topic2(1), topic3(0,1)
consumer3-ecea43e4-1f01-479f-8349-f9130b75d8ee /127.0.0.1      consumer3      0              -
</syntaxhighlight>
* '''-offsets''':默认的describe选项,与“--describe”选项相同的输出。
*: <syntaxhighlight lang="bash" highlight="">
 
</syntaxhighlight>
* '''--state''':此选项提供有用的组级别信息
*: <syntaxhighlight lang="bash" highlight="">
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --state
 
COORDINATOR (ID)          ASSIGNMENT-STRATEGY      STATE                #MEMBERS
localhost:9092 (0)        range                    Stable              4
</syntaxhighlight>
 
 
要手动删除一个或多个消费者组,可以使用“'''--delete'''”:
<syntaxhighlight lang="bash" highlight="">
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group my-group --group my-other-group
 
Deletion of requested consumer groups ('my-group', 'my-other-group') was successful.
</syntaxhighlight>
 
 
要重置消费者组的offset,可以使用“'''--reset-offsets'''”选项。此选项同一时间只支持一个消费者组操作。它需要定义以下范围:'''--all-topics''' 或 '''--topic'''。
* 除非您使用 '''--from-file''' 方案,否则必须选择一个范围。另外,首先请确保消费者实例处于非活动状态。
 
它有3个执行操作命令选项:
* (默认)显示要重置的offset
* 执行'''--reset-offsets'''处理
* '''--export''':将结果导出为CSV格式。
 
 
'''--reset-offsets''' 还具有以下场景可供选择(必须选择至少一个场景):
* --reset-offsets :将offset重置为与日期时间的offset。 格式:'YYYY-MM-DDTHH:mm:SS.sss'
* --to-earliest : 将offset重置为最早的offset。
* --to-latest : 将offsets重置为最新的offsets。
* --shift-by : 重置offsets,通过移位“n”,其中“ n”可以为正或负。
* --from-file : 将offset重置为CSV文件中定义的值。
* --to-current : 将offset重置为当前的offset。
* --by-duration : 将offset重置为从当前时间戳重置为持续时间offset。格式:“ PnDTnHnMnS”
* --to-offset : 将offset重置为指定的。
 
请注意,超出范围的offset将调整为可用的offset。例如,如果offset最大为10,设置为15时,则实际上将选择offset将为10。
 
例如,要将消费者组的offset重置为最新的offset:
<syntaxhighlight lang="bash" highlight="">
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group consumergroup1 --topic topic1 --to-latest
 
TOPIC                          PARTITION  NEW-OFFSET
topic1                        0          0
</syntaxhighlight>
 
 
* 如果你使用是老的高级消费者并在zookeeper存储消费者组的元数据(即:offsets.storage=zookeeper),则通过 '''--zookeeper''',而不是 '''bootstrap-server'''
*: <syntaxhighlight lang="bash" highlight="">
> bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list
</syntaxhighlight>
 
== 扩大集群 ==
 
 
 
 
*
*: <syntaxhighlight lang="bash" highlight="">
 
</syntaxhighlight>
 
 
 
 
 
 
 
<syntaxhighlight lang="bash" highlight="">


</syntaxhighlight>
</syntaxhighlight>

2021年5月20日 (四) 10:12的版本


添加、修改、删除 topic

如果第一次发布一个不存在的topic时,它会自动创建。也可以手动添加topic。


  1. 添加、修改 topic:
 > bin/kafka-topics.sh --zookeeper zk_host:port/chroot --create --topic my_topic_name 
       --partitions 20 --replication-factor 3 --config x=y
    • 副本控制每个消息在服务器中的备份。
      如果有3个副本,那么最多允许有 2 个节点宕掉才能不丢数据,集群中推荐设置 2 或 3 个副本,才不会中断数据消费。
    • 分区数控制 topic 将分片成多少 log。
      关于分区数的影响,首先每个分区必须完整的存储在单个的服务器上。因此,如果你有20个分区的话(读和写的负载),那么完整的数据集将不超过20个服务器(不计算备份)。最后,分区数影响消费者的最大并发。
    • 命令行上添加的配置覆盖了服务器的默认设置,服务器有关于时间长度的数据,应该保留。
  1. 更改topic的配置和分区:
    1. kafka版本 < 2.2
      > bin/kafka-topics.sh --zookeeper zk_host:port/chroot --create --topic my_topic_name
              --partitions 20 --replication-factor 3 --config x=y
      
    2. kafka版本 >= 2.2
      > bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic my_topic_name \
              --partitions 40
      
    • 请注意,分区的一种用例是在语义上对数据进行分区,添加分区不会更改现有数据的分区,因此如果消费者依赖该分区,可能会打扰消费者。
      就是说,如果数据是通过“hash(key) % number_of_partition”进行分区划分的,那么该分区可能会因为添加分区而被搅乱,但是Kafka不会尝试以任何方式自动重新分发数据。
  2. 添加配置:
    > bin/kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics --entity-name my_topic_name --alter --add-config x=y
    
  3. 移除配置:
    > bin/kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics --entity-name my_topic_name --alter --delete-config x
    
  4. 删除 topic:
    > bin/kafka-topics.sh --bootstrap-server broker_host:port --delete --topic my_topic_name
    
    • topic 删除选项默认是关闭的,设置服务器配置开启它。
      delete.topic.enable=true
      


  • Kafka 目前不支持减少分区数和改变备份数,但是可以通过迁移脚本来实现【???】。

优雅地关闭Kafka

Kafka集群将自动检测任何代理关闭或故障,并为该计算机上的分区选举新的领导者。

无论服务器发生故障还是为了维护或更改配置而故意关闭,都会发生这种情况。

对于后一种情况,Kafka支持一种更优雅的机制,即先停止服务器,然后再终止它。

当服务器正常停止时,它将利用两种优化:

  1. 它将所有日志同步到磁盘上,以避免重新启动时需要进行任何日志恢复(即验证日志尾部所有消息的校验和)。 日志恢复需要时间,因此可以加快有意重启的速度。
  2. 它将在关闭服务器之前将服务器所领导的所有分区迁移到其他副本。 这将使领导层转移更快,并将每个分区不可用的时间减少到几毫秒。


只要服务器停止运行(不是通过强行终止),都会自动同步日志,但受控的领导层迁移需要使用特殊设置:

controlled.shutdown.enable=true
  • 注意:只有在 broker 上托管的所有分区都有副本的情况下(即复制因子大于 1 且至少有一个副本处于活动状态),受控关闭才会成功。
    这通常是您想要的,因为关闭最后一个副本会使该主题分区不可用。

平衡leader

当一个 broker 停止或崩溃时,这个 broker 中所有分区的 leader 将转移给其他副本。

  • 这意味着:在默认情况下,当这个broker重新启动之后,它的所有分区都将仅作为follower,不再用于客户端的读写操作。

为了避免这种不平衡,Kafka 提出了“首选副本”的概念:

如果分区的副本列表为1、5、9,则节点1将优先作为其他两个副本5和9的 Leader,因为它在副本列表中较早。


您可以让 Kafka群集通过运行以下命令,尝试将已恢复的副本恢复为 Leader :

# kafka版本 <= 2.4
> bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot

# kafka新版本
> bin/kafka-preferred-replica-election.sh --bootstrap-server broker_host:port

手动运行很无趣,你可以通过这个配置设置为自动执行:

auto.leader.rebalance.enable=true

镜像集群之间的数据

我们指的是kafka集群之间复制数据“镜像”,为避免在单个集群中的节点之间发生复制混乱的。


kafka附带了kafka集群之间的镜像数据的工具。该工具从一个源集群读取和写入到目标集群,像这样:

Kafka:镜像集群之间的数据.png

常见的用例是镜像在另一个数据中心提供一个副本。

  • 你可以运行很多这样的镜像进程来提高吞吐和容错性(如果某个进程挂了,则其他的进程会接管)
  • 数据从源集群中的topic读取并将其写入到目标集群中相名的topic。事实上,镜像制作不比消费者和生产者连接要好。
  • 源和目标集群是完全独立的实体:分区数和offset可以都不相同,就是因为这个原因,镜像集群并不是真的打算作为一个容错机制(消费者位置是不同的),为此,我们推荐使用正常的集群复制。然而,镜像制造将保留和使用分区的消息key,以便每个键基础上保存顺序。


示例,从两个输入集群镜像到一个topic(名为:my-topic):

> bin/kafka-run-class.sh kafka.tools.MirrorMaker
       --consumer.config consumer-1.properties --consumer.config consumer-2.properties 
       --producer.config producer.properties --whitelist my-topic
  • 注意,我们用 --whitelist 选项指定topic列表。此选项允许使用java风格的正则表达式。
    所以你可以使用 --whitelist 'A|B' ,A和B是镜像名。或者你可以镜像所有topic。也可以使用--whitelist ‘*’镜像所有topic,为了确保引用的正则表达式不会被shell认为是一个文件路径,我们允许使用‘,’ 而不是‘|’指定topic列表。
  • 你可以很容易的排除哪些是不需要的,可以用--blacklist来排除,目前--new.consumer不支持。
  • 镜像结合配置 auto.create.topics.enable=true,这样副本集群就会自动创建和复制。

检查消费者位置

有时候需要去查看你的消费者的位置。


我们有一个显示“消费者组中”所有消费者的位置的工具,显示日志其落后多远。如下:

消费者组名为my-group,消费者topic名为my-topic,
> bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect localhost:2181 --group test

Group           Topic                          Pid Offset          logSize         Lag             Owner
my-group        my-topic                       0   0               0               0               test_jkreps-mn-1394154511599-60744496-0
my-group        my-topic                       1   0               0               0               test_jkreps-mn-1394154521217-1a0be913-0
  • 注意:在0.9.0.0,kafka.tools.ConsumerOffsetChecker 已经不支持了。你应该使用 kafka.admin.ConsumerGroupCommandbin/kafka-consumer-groups.sh 脚本来管理消费者组,包括用新消费者API创建的消费者。
    ## 0.9+
    bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group test-consumer-group
    
    ## 0.10+
    bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
    

Managing Consumer Groups(管理消费者组)

ConsumerGroupCommand 工具,我们可以使用 listdescribe,或 delete 消费者组(注意,删除只有在分组元数据存储在zookeeper的才可用)。

  • 当使用新消费者API(broker协调处理分区和重新平衡),当该组的最后一个提交的偏移到期时,该组被删除。


示例:

  1. 要列出所有主题中的所有用户组:
    > bin/kafka-consumer-groups.sh --bootstrap-server broker1:9092 --list
    
     test-consumer-group
    
  2. 要像前面的示例中那样使用ConsumerOffsetChecker查看偏移量,可以这样“describe”消费者组:
    > bin/kafka-consumer-groups.sh --bootstrap-server broker1:9092 --describe --group test-consumer-group
    
    GROUP                  TOPIC                       PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             OWNER
    test-consumer-group        test-foo                       0          1           3      2               consumer-1_/127.0.0.1
    


还有一切其他的命令可以提供消费组更多详细信息:

  • -members:此选项提供使用者组中所有活动成员的列表。
    > bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members
    
    CONSUMER-ID                                    HOST            CLIENT-ID       #PARTITIONS
    consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1      consumer1       2
    consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1      consumer4       1
    consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1      consumer2       3
    consumer3-ecea43e4-1f01-479f-8349-f9130b75d8ee /127.0.0.1      consumer3       0
    
  • --members --verbose:除了上述“ --members”选项报告的信息之外,此选项还提供分配给每个成员的分区
    > bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members --verbose
    
    CONSUMER-ID                                    HOST            CLIENT-ID       #PARTITIONS     ASSIGNMENT
    consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1      consumer1       2               topic1(0), topic2(0)
    consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1      consumer4       1               topic3(2)
    consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1      consumer2       3               topic2(1), topic3(0,1)
    consumer3-ecea43e4-1f01-479f-8349-f9130b75d8ee /127.0.0.1      consumer3       0               -
    
  • -offsets:默认的describe选项,与“--describe”选项相同的输出。
  • --state:此选项提供有用的组级别信息
    > bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --state
    
    COORDINATOR (ID)          ASSIGNMENT-STRATEGY       STATE                #MEMBERS
    localhost:9092 (0)        range                     Stable               4
    


要手动删除一个或多个消费者组,可以使用“--delete”:

> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group my-group --group my-other-group

 Deletion of requested consumer groups ('my-group', 'my-other-group') was successful.


要重置消费者组的offset,可以使用“--reset-offsets”选项。此选项同一时间只支持一个消费者组操作。它需要定义以下范围:--all-topics--topic

  • 除非您使用 --from-file 方案,否则必须选择一个范围。另外,首先请确保消费者实例处于非活动状态。

它有3个执行操作命令选项:

  • (默认)显示要重置的offset
  • 执行--reset-offsets处理
  • --export:将结果导出为CSV格式。


--reset-offsets 还具有以下场景可供选择(必须选择至少一个场景):

  • --reset-offsets :将offset重置为与日期时间的offset。 格式:'YYYY-MM-DDTHH:mm:SS.sss'
  • --to-earliest : 将offset重置为最早的offset。
  • --to-latest : 将offsets重置为最新的offsets。
  • --shift-by : 重置offsets,通过移位“n”,其中“ n”可以为正或负。
  • --from-file : 将offset重置为CSV文件中定义的值。
  • --to-current : 将offset重置为当前的offset。
  • --by-duration : 将offset重置为从当前时间戳重置为持续时间offset。格式:“ PnDTnHnMnS”
  • --to-offset : 将offset重置为指定的。

请注意,超出范围的offset将调整为可用的offset。例如,如果offset最大为10,设置为15时,则实际上将选择offset将为10。

例如,要将消费者组的offset重置为最新的offset:

> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group consumergroup1 --topic topic1 --to-latest

 TOPIC                          PARTITION  NEW-OFFSET
topic1                         0          0


  • 如果你使用是老的高级消费者并在zookeeper存储消费者组的元数据(即:offsets.storage=zookeeper),则通过 --zookeeper,而不是 bootstrap-server
    > bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list
    

扩大集群