Kafka:基本操作

来自Wikioe
跳到导航 跳到搜索


添加、修改、删除 topic

如果第一次发布一个不存在的topic时,它会自动创建。也可以手动添加topic。


  1. 添加、修改 topic:
     > bin/kafka-topics.sh --zookeeper zk_host:port/chroot --create --topic my_topic_name 
           --partitions 20 --replication-factor 3 --config x=y
    
    • 副本控制每个消息在服务器中的备份。
      如果有3个副本,那么最多允许有 2 个节点宕掉才能不丢数据,集群中推荐设置 2 或 3 个副本,才不会中断数据消费。
    • 分区数控制 topic 将分片成多少 log。
      关于分区数的影响,首先每个分区必须完整的存储在单个的服务器上。因此,如果你有20个分区的话(读和写的负载),那么完整的数据集将不超过20个服务器(不计算备份)。最后,分区数影响消费者的最大并发。
    • 命令行上添加的配置覆盖了服务器的默认设置,服务器有关于时间长度的数据,应该保留。
  2. 更改topic的配置和分区:
    1. kafka版本 < 2.2
      > bin/kafka-topics.sh --zookeeper zk_host:port/chroot --create --topic my_topic_name
              --partitions 20 --replication-factor 3 --config x=y
      
    2. kafka版本 >= 2.2
      > bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic my_topic_name \
              --partitions 40
      
    • 请注意,分区的一种用例是在语义上对数据进行分区,添加分区不会更改现有数据的分区,因此如果消费者依赖该分区,可能会打扰消费者。
      就是说,如果数据是通过“hash(key) % number_of_partition”进行分区划分的,那么该分区可能会因为添加分区而被搅乱,但是Kafka不会尝试以任何方式自动重新分发数据。
  3. 添加配置:
    > bin/kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics --entity-name my_topic_name --alter --add-config x=y
    
  4. 移除配置:
    > bin/kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics --entity-name my_topic_name --alter --delete-config x
    
  5. 删除 topic:
    > bin/kafka-topics.sh --bootstrap-server broker_host:port --delete --topic my_topic_name
    
    • topic 删除选项默认是关闭的,设置服务器配置开启它。
      delete.topic.enable=true
      


  • Kafka 目前不支持减少分区数和改变备份数,但是可以通过迁移脚本来实现【???】。

优雅地关闭Kafka

Kafka集群将自动检测任何代理关闭或故障,并为该计算机上的分区选举新的领导者。

无论服务器发生故障还是为了维护或更改配置而故意关闭,都会发生这种情况。

对于后一种情况,Kafka支持一种更优雅的机制,即先停止服务器,然后再终止它。

当服务器正常停止时,它将利用两种优化:

  1. 它将所有日志同步到磁盘上,以避免重新启动时需要进行任何日志恢复(即验证日志尾部所有消息的校验和)。 日志恢复需要时间,因此可以加快有意重启的速度。
  2. 它将在关闭服务器之前将服务器所领导的所有分区迁移到其他副本。 这将使领导层转移更快,并将每个分区不可用的时间减少到几毫秒。


只要服务器停止运行(不是通过强行终止),都会自动同步日志,但受控的领导层迁移需要使用特殊设置:

controlled.shutdown.enable=true
  • 注意:只有在 broker 上托管的所有分区都有副本的情况下(即复制因子大于 1 且至少有一个副本处于活动状态),受控关闭才会成功。
    这通常是您想要的,因为关闭最后一个副本会使该主题分区不可用。

平衡leader

当一个 broker 停止或崩溃时,这个 broker 中所有分区的 leader 将转移给其他副本。

  • 这意味着:在默认情况下,当这个broker重新启动之后,它的所有分区都将仅作为follower,不再用于客户端的读写操作。

为了避免这种不平衡,Kafka 提出了“首选副本”的概念:

如果分区的副本列表为1、5、9,则节点1将优先作为其他两个副本5和9的 Leader,因为它在副本列表中较早。


您可以让 Kafka群集通过运行以下命令,尝试将已恢复的副本恢复为 Leader :

# kafka版本 <= 2.4
> bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot

# kafka新版本
> bin/kafka-preferred-replica-election.sh --bootstrap-server broker_host:port

手动运行很无趣,你可以通过这个配置设置为自动执行:

auto.leader.rebalance.enable=true

镜像集群之间的数据

我们指的是kafka集群之间复制数据“镜像”,为避免在单个集群中的节点之间发生复制混乱的。


kafka附带了kafka集群之间的镜像数据的工具。该工具从一个源集群读取和写入到目标集群,像这样:

Kafka:镜像集群之间的数据.png

常见的用例是镜像在另一个数据中心提供一个副本。

  • 你可以运行很多这样的镜像进程来提高吞吐和容错性(如果某个进程挂了,则其他的进程会接管)
  • 数据从源集群中的topic读取并将其写入到目标集群中相名的topic。事实上,镜像制作不比消费者和生产者连接要好。
  • 源和目标集群是完全独立的实体:分区数和offset可以都不相同,就是因为这个原因,镜像集群并不是真的打算作为一个容错机制(消费者位置是不同的),为此,我们推荐使用正常的集群复制。然而,镜像制造将保留和使用分区的消息key,以便每个键基础上保存顺序。


示例,从两个输入集群镜像到一个topic(名为:my-topic):

> bin/kafka-run-class.sh kafka.tools.MirrorMaker
       --consumer.config consumer-1.properties --consumer.config consumer-2.properties 
       --producer.config producer.properties --whitelist my-topic
  • 注意,我们用 --whitelist 选项指定topic列表。此选项允许使用java风格的正则表达式。
    所以你可以使用 --whitelist 'A|B' ,A和B是镜像名。或者你可以镜像所有topic。也可以使用--whitelist ‘*’镜像所有topic,为了确保引用的正则表达式不会被shell认为是一个文件路径,我们允许使用‘,’ 而不是‘|’指定topic列表。
  • 你可以很容易的排除哪些是不需要的,可以用--blacklist来排除,目前--new.consumer不支持。
  • 镜像结合配置 auto.create.topics.enable=true,这样副本集群就会自动创建和复制。

检查消费者位置

有时候需要去查看你的消费者的位置。


我们有一个显示“消费者组中”所有消费者的位置的工具,显示日志其落后多远。如下:

消费者组名为my-group,消费者topic名为my-topic,
> bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect localhost:2181 --group test

Group           Topic                          Pid Offset          logSize         Lag             Owner
my-group        my-topic                       0   0               0               0               test_jkreps-mn-1394154511599-60744496-0
my-group        my-topic                       1   0               0               0               test_jkreps-mn-1394154521217-1a0be913-0
  • 注意:在0.9.0.0,kafka.tools.ConsumerOffsetChecker 已经不支持了。你应该使用 kafka.admin.ConsumerGroupCommandbin/kafka-consumer-groups.sh 脚本来管理消费者组,包括用新消费者API创建的消费者。
    ## 0.9+
    bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group test-consumer-group
    
    ## 0.10+
    bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
    

Managing Consumer Groups(管理消费者组)

ConsumerGroupCommand 工具,我们可以使用 listdescribe,或 delete 消费者组(注意,删除只有在分组元数据存储在zookeeper的才可用)。

  • 当使用新消费者API(broker协调处理分区和重新平衡),当该组的最后一个提交的偏移到期时,该组被删除。


示例:

  1. 要列出所有主题中的所有用户组:
    > bin/kafka-consumer-groups.sh --bootstrap-server broker1:9092 --list
    
     test-consumer-group
    
  2. 要像前面的示例中那样使用ConsumerOffsetChecker查看偏移量,可以这样“describe”消费者组:
    > bin/kafka-consumer-groups.sh --bootstrap-server broker1:9092 --describe --group test-consumer-group
    
    GROUP                  TOPIC                       PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             OWNER
    test-consumer-group        test-foo                       0          1           3      2               consumer-1_/127.0.0.1
    


还有一切其他的命令可以提供消费组更多详细信息:

  • -members:此选项提供使用者组中所有活动成员的列表。
    > bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members
    
    CONSUMER-ID                                    HOST            CLIENT-ID       #PARTITIONS
    consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1      consumer1       2
    consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1      consumer4       1
    consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1      consumer2       3
    consumer3-ecea43e4-1f01-479f-8349-f9130b75d8ee /127.0.0.1      consumer3       0
    
  • --members --verbose:除了上述“ --members”选项报告的信息之外,此选项还提供分配给每个成员的分区
    > bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members --verbose
    
    CONSUMER-ID                                    HOST            CLIENT-ID       #PARTITIONS     ASSIGNMENT
    consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1      consumer1       2               topic1(0), topic2(0)
    consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1      consumer4       1               topic3(2)
    consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1      consumer2       3               topic2(1), topic3(0,1)
    consumer3-ecea43e4-1f01-479f-8349-f9130b75d8ee /127.0.0.1      consumer3       0               -
    
  • -offsets:默认的describe选项,与“--describe”选项相同的输出。
  • --state:此选项提供有用的组级别信息
    > bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --state
    
    COORDINATOR (ID)          ASSIGNMENT-STRATEGY       STATE                #MEMBERS
    localhost:9092 (0)        range                     Stable               4
    


要手动删除一个或多个消费者组,可以使用“--delete”:

> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group my-group --group my-other-group

 Deletion of requested consumer groups ('my-group', 'my-other-group') was successful.


要重置消费者组的offset,可以使用“--reset-offsets”选项。此选项同一时间只支持一个消费者组操作。它需要定义以下范围:--all-topics--topic

  • 除非您使用 --from-file 方案,否则必须选择一个范围。另外,首先请确保消费者实例处于非活动状态。

它有3个执行操作命令选项:

  • (默认)显示要重置的offset
  • 执行--reset-offsets处理
  • --export:将结果导出为CSV格式。


--reset-offsets 还具有以下场景可供选择(必须选择至少一个场景):

  • --reset-offsets :将offset重置为与日期时间的offset。 格式:'YYYY-MM-DDTHH:mm:SS.sss'
  • --to-earliest : 将offset重置为最早的offset。
  • --to-latest : 将offsets重置为最新的offsets。
  • --shift-by : 重置offsets,通过移位“n”,其中“ n”可以为正或负。
  • --from-file : 将offset重置为CSV文件中定义的值。
  • --to-current : 将offset重置为当前的offset。
  • --by-duration : 将offset重置为从当前时间戳重置为持续时间offset。格式:“ PnDTnHnMnS”
  • --to-offset : 将offset重置为指定的。

请注意,超出范围的offset将调整为可用的offset。例如,如果offset最大为10,设置为15时,则实际上将选择offset将为10。

例如,要将消费者组的offset重置为最新的offset:

> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group consumergroup1 --topic topic1 --to-latest

 TOPIC                          PARTITION  NEW-OFFSET
topic1                         0          0


  • 如果你使用是老的高级消费者并在zookeeper存储消费者组的元数据(即:offsets.storage=zookeeper),则通过 --zookeeper,而不是 bootstrap-server
    > bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list
    

扩大集群

增加新服务到kafka集群是很容易的,只要为新服务分配一个独一无二的 Broker ID 并启动即可。但是,新的服务不会自动分配到任何数据,因此除非将分区移动到这些服务器上,否则在创建新主题之前,它们不会执行任何工作。

因此,通常在将计算机添加到集群时,您会希望将一些现有数据迁移到这些计算机。

迁移数据的过程是手动启动的,但是执行过程是完全自动化的。在kafka后台内部中,kafka将添加新的服务器,并作为正在迁移分区的 follower,来完全复制该分区现有的数据。当新服务器完全复制该分区的内容并加入同步副本,成为现有副本之一后,就将现有的副本分区上的数据删除。

分区重新分配工具可以用于跨 broker 迁移分区,理想的分区分配将确保所有的 broker 数据负载和分区大小。分区分配工具没有自动研究 kafka 集群的数据分布和迁移分区达到负载分布的能力,因此,管理员要弄清楚哪些topic或分区应该迁移。


分区分配工具的3种模式:

  1. --generate:在此模式下,给定主题列表和代理列表,以将指定的topic的所有parition都移动到新的broker。(是生成分配规则json文件的
    • 此选项仅提供了一种方便的方法,可以在给定主题和目标代理列表的情况下生成分区重新分配计划。
  2. --execute:在此模式下,是执行你用 --generate 生成的分配规则json文件的,(用 --reassignment-json-file 选项),可以是自定义的分配计划,也可以是由管理员或通过 --generate 选项生成的。
  3. --verify:在此模式下,该工具验证在最后一次 --execute 期间列出的所有分区的重新分配状态。状态可以是成功完成、失败或正在进行

自动将数据迁移到新机器

使用分区重新分配工具将从当前的broker集的一些topic移到新添加的broker。同时扩大现有集群,因为这很容易将整个topic移动到新的broker,而不是每次移动一个parition,你要提供新的broker和新broker的目标列表的topic列表(就是刚才的生成的json文件)。然后工具将根据你提供的列表把topic的所有parition均匀地分布在所有的broker,topic的副本保持不变。


例如,下面的例子将主题 foo1,foo2 的所有分区移动到新的 broker 5,6。移动结束后,主题 foo1 和 foo2 所有的分区都会只会在 broker 5,6:

【下面所有的json文件,都是要你自己新建的,不是自动创建的,需要你自己把生成的规则复制到你新建的json文件里,然后执行。】

  1. 执行迁移工具需要接收一个json文件,首先需要你确认topic的迁移计划并创建json文件,如下所示
    > cat topics-to-move.json
    {"topics": [{"topic": "foo1"},
                {"topic": "foo2"}],
     "version":1
    }
    
  2. 一旦json文件准备就绪,就可以使用分区重新分配工具来生成候选分配
    > bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-move.json --broker-list "5,6" --generate 
    Current partition replica assignment
    
    {"version":1,
     "partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]},
                   {"topic":"foo1","partition":0,"replicas":[3,4]},
                   {"topic":"foo2","partition":2,"replicas":[1,2]},
                   {"topic":"foo2","partition":0,"replicas":[3,4]},
                   {"topic":"foo1","partition":1,"replicas":[2,3]},
                   {"topic":"foo2","partition":1,"replicas":[2,3]}]
    }
    
    Proposed partition reassignment configuration
    
    {"version":1,
     "partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]},
                   {"topic":"foo1","partition":0,"replicas":[5,6]},
                   {"topic":"foo2","partition":2,"replicas":[5,6]},
                   {"topic":"foo2","partition":0,"replicas":[5,6]},
                   {"topic":"foo1","partition":1,"replicas":[5,6]},
                   {"topic":"foo2","partition":1,"replicas":[5,6]}]
    }
    
  3. 生成从主题 foo1,foo2 迁移所有的分区到 broker 5,6 的候选分配规则。
    • 注意,这个时候,迁移还没有开始,它只是告诉你当前分配和新的分配规则,当前分配规则用来回滚,新的分配规则保存在json文件(例如,我保存在 expand-cluster-reassignment.json这个文件下)然后,用--execute选项来执行它。
    > bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --execute
    Current partition replica assignment
    
    {"version":1,
     "partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]},
                   {"topic":"foo1","partition":0,"replicas":[3,4]},
                   {"topic":"foo2","partition":2,"replicas":[1,2]},
                   {"topic":"foo2","partition":0,"replicas":[3,4]},
                   {"topic":"foo1","partition":1,"replicas":[2,3]},
                   {"topic":"foo2","partition":1,"replicas":[2,3]}]
    }
    
    Save this to use as the --reassignment-json-file option during rollback
    Successfully started reassignment of partitions
    {"version":1,
     "partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]},
                   {"topic":"foo1","partition":0,"replicas":[5,6]},
                   {"topic":"foo2","partition":2,"replicas":[5,6]},
                   {"topic":"foo2","partition":0,"replicas":[5,6]},
                   {"topic":"foo1","partition":1,"replicas":[5,6]},
                   {"topic":"foo2","partition":1,"replicas":[5,6]}]
    }
    
  4. 最后,--verify 选项用来检查parition重新分配的状态:
    • 注意, expand-cluster-reassignment.json(与--execute选项使用的相同)和--verify选项一起使用。
    > bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --verify
    Status of partition reassignment:
    Reassignment of partition [foo1,0] completed successfully
    Reassignment of partition [foo1,1] is in progress
    Reassignment of partition [foo1,2] is in progress
    Reassignment of partition [foo2,0] completed successfully
    Reassignment of partition [foo2,1] completed successfully 
    Reassignment of partition [foo2,2] completed successfully
    

自定义分区分配和迁移

分区重新分配工具也可以有选择性将分区副本移动到指定的broker。

  • 当用这种方式,假定你已经知道了分区规则,不需要通过工具生成规则,可以跳过 --generate,直接使用 --execute


例如,下面的例子是移动主题foo1的分区0到brokers 5,6 和主题foo2的分区1到broker 2,3。

  1. 手工写一个自定义的分配计划到json文件中:
    > cat custom-reassignment.json
    {"version":1,"partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},{"topic":"foo2","partition":1,"replicas":[2,3]}]}
    
  2. 然后,--execute 选项执行分配处理:
    > bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --execute
    Current partition replica assignment
    
    {"version":1,
     "partitions":[{"topic":"foo1","partition":0,"replicas":[1,2]},
                   {"topic":"foo2","partition":1,"replicas":[3,4]}]
    }
    
    Save this to use as the --reassignment-json-file option during rollback
    Successfully started reassignment of partitions
    {"version":1,
     "partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},
                   {"topic":"foo2","partition":1,"replicas":[2,3]}]
    }
    
  3. 最后使用--verify 验证:
    bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --verify
    Status of partition reassignment:
    Reassignment of partition [foo1,0] completed successfully
    Reassignment of partition [foo2,1] completed successfully
    

退役brokers

分区重新分配工具还不能自动为退役代理生成重新分配计划。因此,管理员必须制定一个重新分配计划,将托管在 broker 上的所有分区的副本转移到其他代理上。

这可能会相对繁琐,因为重新分配需要确保所有副本从退役的broker迁移到另一个没有停运的broker。

为了简化这个过程,我们计划在将来为退役 broker 添加工具支持。

增加副本

在现有分区增加副本是很容易的,只要指定自定义的重新分配的json文件脚本,并用 --execute 选项去执行这个脚本。


例如,下面的示例将主题 foo 的分区 0 的复制因子从 1 增加到 3。在增加复制因子之前,该分区的唯一副本存在于代理 5 上。作为增加复制因子的一部分,我们将在代理 6 和 7 上添加更多副本:

  1. 手工写一个自定义的分配的json脚本:
    > cat increase-replication-factor.json
    {"version":1,
     "partitions":[{"topic":"foo","partition":0,"replicas":[5,6,7]}]}
    
  2. 然后,用--execute选项运行json脚本:
    > bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --execute
    Current partition replica assignment
    
    {"version":1,
     "partitions":[{"topic":"foo","partition":0,"replicas":[5]}]}
    
    Save this to use as the --reassignment-json-file option during rollback
    Successfully started reassignment of partitions
    {"version":1,
     "partitions":[{"topic":"foo","partition":0,"replicas":[5,6,7]}]}
    
  3. -- version 选项来验证parition分配的状态。注意,使用同样的 increase-replication-factor.json
    bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --verify
    Status of partition reassignment:
    Reassignment of partition [foo,0] completed successfully
    
    • 你也可以使用kafka-topic工具验证:
      > bin/kafka-topics.sh --zookeeper localhost:2181 --topic foo --describe
      Topic:foo	PartitionCount:1	ReplicationFactor:3	Configs:
      	Topic: foo	Partition: 0	Leader: 5	Replicas: 5,6,7	Isr: 5,6,7
      

彻底删除topic【???】

kafka0.8.1.1以及之前版本都无法使用类似一条命令就彻底删除topic,以前看过网上一些删除命令不过只是在zookeeper注销信息而已,但是实际的日志内容还是保存在kafka log中。

机器环境如下:

Kafka目录:/usr/local/kafka_2.10-0.8.1.1
日志保存目录log.dirs:/data1/kafka/log/
删除的topic名字:zitest2
  1. 从zookeerer删除信息:
    /usr/local/kafka_2.10-0.8.1.1/bin/kafka-run-class.shkafka.admin.DeleteTopicCommand --zookeeper 10.12.0.91:2181,10.12.0.92:2181,10.12.0.93:2181/kafka--topic zitest2
    
    成功后返回信息:deletion succeeded!
  2. JPS查看kill掉QuorumPeerMain和Kafka进程
  3. 从log.dirs目录删除文件,可以看到多个子目录名字如zitest2-0,zitest2-1…zitest2-n(就是你topic的partition个数)
    rm  –fr  zitest2-0……zitest2-n
    
  4. 修改日志目录的recovery-point-offset-checkpoint和replication-offset-checkpoint文件(要小心删除,否则待会kafka不能正常启动起来)
    replication-offset-checkpoint格式如下:
    0
    4(partition总数)
    zitest2 0 0
    zitest2  3 0
    hehe 0 0
    hehe 1 0
    
    修改后如下:
    0
    2(partition总数)
    hehe 0 0
    hehe 1 0
    
    把含有zitest2行全部去掉,并且把 partition 总数修改为减去 zitest2 的 partition 的剩余数目,同理 recovery-point-offset-checkpoint 也是这样修改。

完成后就可以正常启动zookeeper和kafka。

数据迁移期间限制带宽的使用【???】

Kafka提供一个broker之间复制传输的流量限制,限制了副本从机器到另一台机器的带宽上限。当重新平衡集群,引导新broker,添加或移除broker时候,这是很有用的。因为它限制了这些密集型的数据操作从而保障了对用户的影响。

有2个接口可以实现限制:

  1. 最简单和最安全的是调用 kafka-reassign-partitions.sh 时加限制。
  2. 另外 kafka-configs.sh 也可以直接查看和修改限制值。


例如,当执行重新平衡时,用下面的命令,它在移动分区时,将不会超过 50MB/s:

$ bin/kafka-reassign-partitions.sh --zookeeper myhost:2181 --execute --reassignment-json-file bigger-cluster.json —throttle 50000000

当你运行这个脚本,你会看到这个限制:

The throttle limit was set to 50000000 B/s
Successfully started reassignment of partitions.

如果你想在重新平衡期间修改限制,增加吞吐量,以便完成的更快。你可以重新运行 --execute 命令,用相同的reassignment-json-file:

$ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181  --execute --reassignment-json-file bigger-cluster.json --throttle 700000000

  There is an existing assignment running.
  The throttle limit was set to 700000000 B/s

一旦重新平衡完成,可以使用 --verify 操作验证重新平衡的状态。如果重新平衡已经完成,限制也会通过--verify命令移除。

  • 这点很重要,因为一旦重新平衡完成,并通过--veriry操作及时移除限制。否则可能会导致定期复制操作的流量也受到限制。
当--verify执行,并且重新分配已完成时,此脚本将确认限制被移除:
$ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181  --verify --reassignment-json-file bigger-cluster.json
  Status of partition reassignment:
  Reassignment of partition [my-topic,1] completed successfully
  Reassignment of partition [mytopic,0] completed successfully
  Throttle was removed.


管理员还可以使用 kafka-configs.sh 验证已分配的配置。有 2 对限制配置用于管理限流。而限制值本身,是个 broker 级别的配置,用于动态属性配置:

leader.replication.throttled.rate
  follower.replication.throttled.rate

此外,还有枚举集合的限流副本:

leader.replication.throttled.replicas
  follower.replication.throttled.replicas

其中每个topic配置,所有4个配置值通过kafka-reassign-partitions.sh(下面讨论)自动分配。

查看限流配置:

$ bin/kafka-configs.sh --describe --zookeeper localhost:2181 --entity-type brokers
  Configs for brokers '2' are leader.replication.throttled.rate=700000000,follower.replication.throttled.rate=700000000
  Configs for brokers '1' are leader.replication.throttled.rate=700000000,follower.replication.throttled.rate=700000000

这显示了应用于复制协议的leader和follower的限制。默认情况下,2个都分配了相同的限制值。

要查看限流副本的列表:

$ bin/kafka-configs.sh --describe --zookeeper localhost:2181 --entity-type topics
  Configs for topic 'my-topic' are leader.replication.throttled.replicas=1:102,0:101,
      follower.replication.throttled.replicas=1:101,0:102


这里我们看到 leader 限制被应用到 broker 102 上的分区 1 和 broker 101 的分区 0。同样,follower 限制应用到 broker 101 的分区 1 和 broker 102 的分区 0。

默认情况下,kafka-reassign-partitions.sh 会将 leader 限制应用于重新平衡前存在的所有副本,任何一个副本都可能是 leader。它将应用 follower 限制到所有移动目的地。因此,如果 broker 101,102 上有一个副本分区,被分配给 102,103,则该分区的 leader 限制,将被应用到 101,102,并且 follower 限制将仅被应用于103。


如果需要,你还可以使用 kafka-configs.sh 的 --alter 开关手动地更改限制配置。

安全的使用限制复制

在使用限制复制时应特别的小心,特别是:

  1. 限制移除:
    一旦重新分配完成,限制应该及时的移除(通过运行kafka-reassign-partitions —verify移除)。
  2. 确保进展:
    如果限制设置的太低,与传入的写入速率相比,复制可能无法进行:
    max(BytesInPerSec) > throttle
    
    其中BytesInPerSec是监控生产者写入到broker的吞吐量。


可以使用该命令监视重新平衡期间复制是否在进行,使用以下方式:

kafka.server:type=FetcherLagMetrics,name=ConsumerLag,clientId=([-.\w]+),topic=([-.\w]+),partition=([0-9]+)

在复制期间落后应不断地减少,如果没有缩小,则管理员通过上面介绍的方式增加限制的吞吐量。

设置配额

默认情况下,客户端的配额不受限制。可以为每个(user,client-id),user或client-id分组设置自定义的配额。


配置自定义的配额(user=user1,client-id=clientA):

> bin/kafka-configs.sh  --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048' --entity-type users --entity-name user1 --entity-type clients --entity-name clientA
Updated config for entity: user-principal 'user1', client-id 'clientA'.

为 user=user1 配置自定义的配额:

> bin/kafka-configs.sh  --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048' --entity-type users --entity-name user1
Updated config for entity: user-principal 'user1'.

为 client-id=clientA 配置自定义的配额:

> bin/kafka-configs.sh  --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048' --entity-type clients --entity-name clientA
Updated config for entity: client-id 'clientA'.
  • 可以通过--entity-default为(user,client-id),user或client-id group设置默认的配额。

为user=userA配置默认client-id配额:

> bin/kafka-configs.sh  --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048' --entity-type users --entity-name user1 --entity-type clients --entity-default
Updated config for entity: user-principal 'user1', default client-id.

为user配置默认配额:

> bin/kafka-configs.sh  --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048' --entity-type users --entity-default
Updated config for entity: default user-principal.

为client-id配置默认配额:

> bin/kafka-configs.sh  --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048' --entity-type clients --entity-default
Updated config for entity: default client-id.

为指定的(user,client-id)展示配额:

> bin/kafka-configs.sh  --zookeeper localhost:2181 --describe --entity-type users --entity-name user1 --entity-type clients --entity-name clientA
Configs for user-principal 'user1', client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048

为指定的user展示配额:

> bin/kafka-configs.sh  --zookeeper localhost:2181 --describe --entity-type users --entity-name user1
Configs for user-principal 'user1' are producer_byte_rate=1024,consumer_byte_rate=2048

为指定的client-id展示配额:

> bin/kafka-configs.sh  --zookeeper localhost:2181 --describe --entity-type clients --entity-name clientA
Configs for client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048

如果没有指定名称,则展示指定的类型的,查看所有user,或(user,client):

> bin/kafka-configs.sh  --zookeeper localhost:2181 --describe --entity-type users
Configs for user-principal 'user1' are producer_byte_rate=1024,consumer_byte_rate=2048
Configs for default user-principal are producer_byte_rate=1024,consumer_byte_rate=2048
> bin/kafka-configs.sh  --zookeeper localhost:2181 --describe --entity-type users --entity-type clients
Configs for user-principal 'user1', default client-id are producer_byte_rate=1024,consumer_byte_rate=2048
Configs for user-principal 'user1', client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048


可以通过在 broker 中设置以下配置来设置适用于所有 client-id 的默认配额。

  • 仅当未在 Zookeeper 中配置配额覆盖或默认值时,才应用这些属性。
  • 默认情况下,每个 client-id 接收一个无限制的配额。

以下将每个producer和consumer client-id的默认配额设置为10MB /秒。

quota.producer.default=10485760
quota.consumer.default=10485760
  • 请注意,这些属性已被弃用,可能会在将来的版本中删除。 请优先使用kafka-configs.sh。

从老版本升级kafka

从0.8.x, 0.9.x 或 0.10.0.X 升级到 0.10.1.0

0.10.1.0有线协议更改,通过遵循以下建议的滚动升级,在升级期间不会停机。但是,需要注意升0.10.1.0中潜在的突发状况。

注意:

  • 由于引入了新的协议,要在升级客户端之前先升级kafka集群(即,0.10.1.x仅支持 0.10.1.x或更高版本的broker,但是0.10.1.x的broker向下支持旧版本的客户端)


滚动升级:

  1. 更新所有 broker 的 server.properties 文件,并添加以下属性:
    • inter.broker.protocol.version=CURRENT_KAFKA_VERSION (如:0.8.2.0, 0.9.0.0 或 0.10.0.0).
    • log.message.format.version=CURRENT_KAFKA_VERSION (有关此配置的详细信息,请查看升级后潜在的性能影响。)
  2. 每次升级一个broker:关闭broker,替换新版本,然后重新启动它。
  3. 一旦整个群集升级,通过编辑 inter.broker.protocol.version 并将其设置为 0.10.1.0 来转换所有协议。
  4. 如果之前的消息格式是 0.10.0,则将 log.message.format.version 更改为 0.10.1(这无影响,因为 0.10.0 和 0.10.1 的消息格式是相同的)。
    • 如果之前的消息格式版本低于 0.10.0,还不能更改 log.message.format.version;一旦所有的消费者都已升级到 0.10.0.0 或更高版本时,才能更改此参数。
  5. 逐个重新启动broker,使新协议版本生效。
  6. 如果 log.message.format.version 低于 0.10.0,请等待,知道所有消费者升级到0.10.0或更新的版本,然后将每个broker的 log.message.format.version 更改为 0.10.1。然后逐个重启。

注意:

  • 如果你可接受停机,你可以简单地将所有broker关闭,更新版本并重启启动,它们将默认从新版本开始。
  • 变换协议版本和重启启动可以在 broker 升级完成后的任何时间去做,不必马上做。

在0.10.1.0中潜在的变化

  • 新的java消费者不再是测试阶段了,我们建议将其应用到所有的新开发当中。旧的Scala使用仍然支持,但将在下一个版本中弃用,并在未来的主要版本中移除。
  • --new-consumer / --new.consumer 转换不再需要使用 MirrorMaker 和类似于 Console 消费者工具。只需要通过一个 Kafka broker 连接,而不是 ZooKeeper 了。另外,控制台消费者和旧消费者已弃用,并且将在未来的主要版本中移除。
  • Kafka集群现在可通过集群 ID 来标识唯一,broker 升级到 0.10.1.0 时将自动的生成。集群 ID 可通过 kafka.server:type=KafkaServer,name=ClusterId 获取。它是元数据相应的一部分,序列化,客户端拦截器和度量记录器可通过实现 ClusterResourceListener 接口来接收集群ID。
  • BrokerState "RunningAsController" (value 4) 已被移除。由于一个bug,broker 仅在转换出来之前处于这种状态,因此移除影响应该是最小的。
    • 推荐的方法是通过 kafka 检查给定的 broker 是否是控制器:controller:type=KafkaController,name=ActiveControllerCount
  • 新的Java消费者现在允许用户通过分区上的时间戳来搜索offset。
  • 新的Java消费者现在支持后台线程心跳检测,有一个新的配置 max.poll.interval.ms 控制消费者主动离开组之前poll调用之间的最大时间(默认是5 分钟)。配置 request.timeout.ms 的值必须始终大于 max.poll.interval.ms,因为 JoinGroup 请求在消费者重新平衡时候阻塞服务器的最大时间。因此我们更改了其默认值超过5分钟,最后,session.timeout.ms 的默认值已调整为10秒,并 max.poll.records 的默认值更改为 500。
  • 当使用 Authorizer 并且用户对topic没有描述授权时,broker 将不再向请求返回 TOPIC_AUTHORIZATION_FAILED 错误,因为这会泄漏topic名称。 相反,将返回 UNKNOWN_TOPIC_OR_PARTITION 错误代码。 当使用生产者和消费者时,这可能导致意外的超时或延迟,因为Kafka客户端通常将在未知的topic错误时自动重试。 如果您怀疑这可能已经正在发生,你应该查阅客户端日志。
  • 获取响应的默认的限制大小(消费者为50MB,副本为10MB)。现有的分区限制也适用(消费者和副本是1MB)。注意,这些限制不是绝对的最大值(下一节解释)。
  • 如果一个消息大于响应/分区大小限制,消费者和副本可以继续使用。更具体的是,如果在第一个非空分区中的第一个消息大于限制,则消息将仍然返回。
  • kafka.api.FetchRequestkafka.javaapi.FetchRequest 中增加了重载的构造函数。以允许调用者去指定分区的顺序(因为在v3中顺序很重要)。之前的构造函数已弃用。在请求发送之前,以避免资源匮乏问题引起的混洗。

新协议版本

  • ListOffsetRequest v1支持基于时间戳的精确offset搜索。
  • MetadataResponse v2引入了一个新字段:“cluster_id”。
  • FetchRequest v3支持限制响应大小(除了现有的分区限制)。
  • JoinGroup v1引入了一个新字段:“rebalance_timeout”。

从0.8.x 或 0.9.x 升级到 0.10.0.0

0.10.0.0具有潜在的突变更改(请在升级之前查看),以及升级后可能的性能影响。 通过遵循以下建议的滚动升级计划,可保障在升级期间和之后不会出现停机时间和性能影响。 注意:由于引入了新协议,因此在升级客户端之前先升级Kafka集群。

注意:

  • 对于版本 0.9.0.0:由于 0.9.0.0 中有一个bug,依赖于Zookeeper(旧的Scala高级消费者和MirrorMaker如果一起使用)的客户端将无法在 0.10.0.x 中使用。因此,broker升级到 0.10.0.x 之前,先升级 0.9.0.0 客户端到 0.9.0.1。
    • 对于 0.8.X 或 0.9.0.1 客户端,此步骤不是必需的。


滚动升级:

  1. 更新所有 broker 的 server.properties 文件,并添加以下配置:
    • inter.broker.protocol.version=CURRENT_KAFKA_VERSION (例如:0.8.2 或 0.9.0.0).
    • log.message.format.version=CURRENT_KAFKA_VERSION (有关此配置的详细信息,请查看升级后潜在的性能影响。)
  2. 升级broker,关闭它,然后升级到新版本,最后重启它。
  3. 一旦整个集群升级完成,通过编辑 inter.broker.protocol.version 设置为0.10.0.0转换所有协议。注意:你现在应该还不需要设置 message.format.version,此配置应该当所有的消费者升级为0.10.0.0时才需要设置。
  4. 依次重新启动broker,使新协议版本生效。
  5. 一旦所有的消费者已经升级为.10.0,设置每个 broker 的 log.message.format.version 为0.10.0,然后逐个重启。

注意:

  • 如果你接受停机目,你可以简单粗暴的关闭所有broker,更新版本并重新启动。它们默认从新协议开始。
  • 变换协议版本和重启启动可以在broker升级完成后的任何时间去做,不必马上做。

升级到0.10.0.0后潜在的性能影响

0.10.0 中的消息格式包括新的时间戳字段,并使用压缩消息的相关联的 offset。磁盘默认的消息格式是 0.10.0,消息格式可以通过 server.properties 中的 log.message.format.version 配置。如果消费者客户端版本低于 0.10.0.0。它只能“理解” 0.10.0 之前的消息格式。在这种情况下,broker 在发送响应到旧版本消费者之前转换 0.10.0 格式到之前的格式。然而,这样的话,broker 不是零复制传输。在 Kafka 社区关于性能影响的报告显示,在升级后,CPU 利用率从 20% 提高 100%。这迫使所有客户端马升级,促使性能恢复正常。为了避免消费者升级到 0.10.0.0 之前的消息转换,可以设置 log.message.format.version 为 0.8.2 或 0.9.0。这样,broker 仍然零复制传输将数据发送给旧的消费者。一旦消费者升级,就可以把消息格式更为 0.10.0,就可以享受含新时间戳和优化后的压缩新消息格式。转换只是为了确保兼容性,尽可能避免消息转换才是至关重要的。

  • 客户端升级到 0.10.0.0,不会对性能产生影响。


注意:

  • 通过设置消息格式版本,可以证明所有现有消息处于或低于该消息格式版本。否则消费者在0.10.0.0之前可能会中断。特别是,在消息格式设置为0.10.0之后,不应将其更改回较早的格式,因为它可能会在0.10.0.0之前的版本上中断消费者。
  • 由于在每个消息中引入了额外的时间戳,生产者在发送少量消息可能会看到消息吞吐量下降(因为增加了开销)。 同样,复制每个消息传输也增加了8个字节。 如果你集群的能力与网络接近,可能会超过网卡,并看到由于过载的故障和性能问题。
  • 如果生产者已经启用了压缩,则在某些情况下,可能注意到生产者吞吐量减少或broker的压缩率降低。当接收压缩消息时,0.10.0的broker避免再次压缩消息,这样减少延迟并提高吞吐量。然而,在某些情况下,这可能减少生产者的批次大小,导致较差的吞吐量。如果出现这种情况,可调整生产者的linger.ms 和 batch.size以提高吞吐量。另外,生产者用于压缩消息的缓存小于broker生产者使用的缓存,这可能对磁盘上的消息的压缩比有负面影响。 我们打算在未来的Kafka版本中进行配置。

0.10.0.0潜在的变化

  • 从Kafka 0.10.0.0 开始,Kafka中的消息格式版本表示为Kafka的版本。例如,消息格式 0.9.0 指的是支持的最高消息版本就是 0.9.0。
  • 消息格式0.10.0已经介绍过了,并且默认是使用的。消息包含了一个时间戳字段和压缩后消息的关系offset。
  • 已经引入了 ProduceRequest/Response v2,并默认使用支持消息格式0.10.0。
  • 已经引入了 FetchRequest/Response v2已经被引入,它默认使用支持消息格式0.10.0。
  • MessageFormatter 接口从def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) 更改为 def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream)
  • MessageReader 接口从 def readMessage(): KeyedMessage[Array[Byte], Array[Byte]] 更改为 def readMessage(): ProducerRecord[Array[Byte], Array[Byte]]
  • MessageFormatter 的包从 kafka.tools 到 kafka.common
  • MessageReader 的包从 kafka.tools 到 kafka.common
  • MirrorMakerMessageHandler不再处理(记录:MessageAndMetadata [Array [Byte],Array [Byte]])方法从未被调用用。
  • 0.7 版本的 KafkaMigrationTool 不再和 kafka 一起打包。如果你需要从0.7迁移到0.10.0,请先迁移到0.8,然后按照的升级步骤从0.8升级到0.10.0。
  • 新消费者API已标准化,接收 java.util.Collection 作为方法参数的序列化类型。升级现有的版本才能使用0.10.0客户端库
  • LZ4压缩消息处理已更改为使用可互操作的规范框架(LZ4f v1.5.1)。为了保留与旧客户端的兼容性,此改变仅适用于消息格式为0.10.0和更高版本。使用v0/v1(消息格式0.9.0)Produce/Fetch LZ4压缩消息的客户端应继续使用0.9.0实现框架。使用Produce/Fetch协议v2或更高版本的客户端应使用可互操作的LZ4f框架。可互操作的LZ4库的列表可在https://www.lz4.org/查看

在0.10.0.0的显著变化

  • 从0.10.0.0开始,增加一个新的客户端 Kafka Streams 客户端,用于流式处理存储在kafka topic的数据。这个新客户端仅支持0.10.x或更高的版本。
  • 新消费者默认 receive.buffer.bytes64 K
  • 新的消费者现在公开了 exclude.internal.topics 配置,以防止内部topic(例如消费者offset topic)被其他的正则匹配订阅。默认是启用。
  • 旧的的Scala的生产者已经弃用。使用者尽快使用最新的Java客户端。

新的消费者API已标记为稳定。

从0.8.0, 0.8.1.X或0.8.2.X升级到0.9.0.0

9.0.0有潜在的中断更改风险(在升级之前需要知道),并且与之前版本的broker之间的协议改变。这意味着此次升级可能和客户端旧版本不兼容。因此在升级客户端之前,先升级kafka集群。如果你使用MirrorMaker下游集群,则同样应首先升级。


滚动升级:

  1. 升级所有 broker 的 server.properties,并在其中添加 inter.broker.protocol.version = 0.8.2.X
  2. 每次升级一个 broker:关闭 broker,替换新版本,然后重新启动。
  3. 一旦整个群集升级,通过编辑 inter.broker.protocol.version 并将其设置为 0.9.0.0 来转换所有协议。
  4. 逐个重新启动 broker,使新协议版本生效。

注意:

  • 如果你可接受停机,你可以简单地将所有broker关闭,更新版本并重启启动,协议将默认从新版本开始。
  • 变换协议版本和重启启动可以在broker升级完成后的任何时间去做,不必马上做。

0.9.0.0潜在的中断变化

  • Java 1.6不再支持。
  • Scala 2.9不再支持。
  • 默认情况下,1000以上的 Broker ID 为自动分配。如果你的集群高于该阈值,需相应地增加 reserved.broker.max.id 配置。
  • replica.lag.max.messages 配置已经移除。分区leader在决定哪些副本处于同步时将不再考虑落后的消息的数。
  • 配置参数 replica.lag.time.max.ms 现在不仅指自上次从副本获取请求后经过的时间,还指自副本上次被捕获以来的时间。 副本仍然从leader获取消息,但超过 replica.lag.time.max.ms 配置的最新消息将被认为不同步的。
  • 压缩的topic不再接受没有key的消息,如果出现,生产者将抛出异常。 在0.8.x中,没有key的消息将导致日志压缩线程退出(并停止所有压缩的topic)。
  • MirrorMaker不再支持多个目标集群。 它只接受一个--consumer.config。 要镜像多个源集群,每个源集群至少需要一个MirrorMaker实例,每个源集群都有自己的消费者配置。
  • 在 org.apache.kafka.clients.tools。包下的Tools已移至org.apache.kafka.tools。所有包含的脚本仍将照常工作,只有直接导入这些类的自定义代码将受到影响。
  • 在kafka-run-class.sh中更改了默认的Kafka JVM性能选项(KAFKA_JVM_PERFORMANCE_OPTS)。
  • kafka-topics.sh脚本(kafka.admin.TopicCommand)现在退出,失败时出现非零退出代码。
  • kafka-topics.sh脚本(kafka.admin.TopicCommand)现在将在topic名称由于使用“.”或“_”而导致风险度量标准冲突时打印警告。以及冲突的情况下的错误。
  • kafka-console-producer.sh脚本(kafka.tools.ConsoleProducer)将默认使用新的Java Producer,用户必须指定“old-producer”才能使用旧生产者。
  • 默认情况下,所有命令行工具都会将所有日志消息打印到stderr而不是stdout。

0.9.0.1中的显著变化

  • 可以通过将 broker.id.generation.enable 设置为 false 来禁用新的 broker ID 生成功能。
  • 默认情况下,配置参数 log.cleaner.enable 为true。 这意味着topic会清理。
  • policy = compact 现在将被默认压缩,并且 128 MB 的堆(通过log.cleaner.dedupe.buffer.size)分配给清洗进程。你可能需要根据你对压缩topic的使用情况,查看log.cleaner.dedupe.buffer.size和其他log.cleaner配置值。
  • 默认情况下,新消费者的配置参数 fetch.min.bytes 的默认值为1。

0.9.0.0弃用的

  • kafka-topics.sh 脚本的变更topic配置已弃用(kafka.admin.ConfigCommand),以后将使用kafka-configs.sh(kafka.admin.ConfigCommand) 。
  • kafka-consumer-offset-checker.sh(kafka.tools.ConsumerOffsetChecker)已弃用,以后将使用kafka-consumer-groups.sh (kafka.admin.ConsumerGroupCommand)
  • kafka.tools.ProducerPerformance已弃用。以后将使用org.apache.kafka.tools.ProducerPerformance(kafka-producer-perf-test.sh也将使用新类)
  • 生产者的block.on.buffer.full已弃用,并将在以后的版本中移除。目前其默认已经更为false。KafkaProducer将不再抛出BufferExhaustedException,而是使用max.block.ms来中止,之后将抛出TimeoutException。如果block.on.buffer.full属性明确地设置为true,它将设置max.block.ms为Long.MAX_VALUE和metadata.fetch.timeout.ms将不执行。

从0.8.1升级到0.8.2

0.8.2与0.8.1完全兼容。 关闭,更新代码并重新启动,逐个升级broker。

从0.8.0升级到0.8.1

0.8.1与0.8完全兼容。 关闭,更新代码并重新启动,逐个升级broker。

从0.7升级

版本0.7与较新版本不兼容。 对API,ZooKeeper数据结构,协议和配置进行了主要更改,以便添加复制(在0.7中缺失)。 从0.7版升级到更高版本需要一个特殊的迁移工具(通过下一章的API)。 此迁移可以在不停机的情况下完成。