Kafka:基本操作
添加、修改、删除 topic
如果第一次发布一个不存在的topic时,它会自动创建。也可以手动添加topic。
- 添加、修改 topic:
> bin/kafka-topics.sh --zookeeper zk_host:port/chroot --create --topic my_topic_name --partitions 20 --replication-factor 3 --config x=y
- 副本控制每个消息在服务器中的备份。
- 如果有3个副本,那么最多允许有 2 个节点宕掉才能不丢数据,集群中推荐设置 2 或 3 个副本,才不会中断数据消费。
- 分区数控制 topic 将分片成多少 log。
- 关于分区数的影响,首先每个分区必须完整的存储在单个的服务器上。因此,如果你有20个分区的话(读和写的负载),那么完整的数据集将不超过20个服务器(不计算备份)。最后,分区数影响消费者的最大并发。
- 命令行上添加的配置覆盖了服务器的默认设置,服务器有关于时间长度的数据,应该保留。
- 更改topic的配置和分区:
- kafka版本 < 2.2
> bin/kafka-topics.sh --zookeeper zk_host:port/chroot --create --topic my_topic_name --partitions 20 --replication-factor 3 --config x=y
- kafka版本 >= 2.2
> bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic my_topic_name \ --partitions 40
- 请注意,分区的一种用例是在语义上对数据进行分区,添加分区不会更改现有数据的分区,因此如果消费者依赖该分区,可能会打扰消费者。
- 就是说,如果数据是通过“hash(key) % number_of_partition”进行分区划分的,那么该分区可能会因为添加分区而被搅乱,但是Kafka不会尝试以任何方式自动重新分发数据。
- kafka版本 < 2.2
- 添加配置:
> bin/kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics --entity-name my_topic_name --alter --add-config x=y
- 移除配置:
> bin/kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics --entity-name my_topic_name --alter --delete-config x
- 删除 topic:
> bin/kafka-topics.sh --bootstrap-server broker_host:port --delete --topic my_topic_name
- topic 删除选项默认是关闭的,设置服务器配置开启它。
delete.topic.enable=true
- Kafka 目前不支持减少分区数和改变备份数,但是可以通过迁移脚本来实现【???】。
优雅地关闭Kafka
Kafka集群将自动检测任何代理关闭或故障,并为该计算机上的分区选举新的领导者。 无论服务器发生故障还是为了维护或更改配置而故意关闭,都会发生这种情况。 对于后一种情况,Kafka支持一种更优雅的机制,即先停止服务器,然后再终止它。
当服务器正常停止时,它将利用两种优化:
- 它将所有日志同步到磁盘上,以避免重新启动时需要进行任何日志恢复(即验证日志尾部所有消息的校验和)。 日志恢复需要时间,因此可以加快有意重启的速度。
- 它将在关闭服务器之前将服务器所领导的所有分区迁移到其他副本。 这将使领导层转移更快,并将每个分区不可用的时间减少到几毫秒。
只要服务器停止运行(不是通过强行终止),都会自动同步日志,但受控的领导层迁移需要使用特殊设置:
controlled.shutdown.enable=true
- 注意:只有在 broker 上托管的所有分区都有副本的情况下(即复制因子大于 1 且至少有一个副本处于活动状态),受控关闭才会成功。
- 这通常是您想要的,因为关闭最后一个副本会使该主题分区不可用。
平衡leader
当一个 broker 停止或崩溃时,这个 broker 中所有分区的 leader 将转移给其他副本。
- 这意味着:在默认情况下,当这个broker重新启动之后,它的所有分区都将仅作为follower,不再用于客户端的读写操作。
为了避免这种不平衡,Kafka 提出了“首选副本”的概念:
- 如果分区的副本列表为1、5、9,则节点1将优先作为其他两个副本5和9的 Leader,因为它在副本列表中较早。
您可以让 Kafka群集通过运行以下命令,尝试将已恢复的副本恢复为 Leader :
# kafka版本 <= 2.4
> bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot
# kafka新版本
> bin/kafka-preferred-replica-election.sh --bootstrap-server broker_host:port
手动运行很无趣,你可以通过这个配置设置为自动执行:
auto.leader.rebalance.enable=true
镜像集群之间的数据
我们指的是kafka集群之间复制数据“镜像”,为避免在单个集群中的节点之间发生复制混乱的。
kafka附带了kafka集群之间的镜像数据的工具。该工具从一个源集群读取和写入到目标集群,像这样:
常见的用例是镜像在另一个数据中心提供一个副本。
- 你可以运行很多这样的镜像进程来提高吞吐和容错性(如果某个进程挂了,则其他的进程会接管)
- 数据从源集群中的topic读取并将其写入到目标集群中相名的topic。事实上,镜像制作不比消费者和生产者连接要好。
- 源和目标集群是完全独立的实体:分区数和offset可以都不相同,就是因为这个原因,镜像集群并不是真的打算作为一个容错机制(消费者位置是不同的),为此,我们推荐使用正常的集群复制。然而,镜像制造将保留和使用分区的消息key,以便每个键基础上保存顺序。
示例,从两个输入集群镜像到一个topic(名为:my-topic):
> bin/kafka-run-class.sh kafka.tools.MirrorMaker
--consumer.config consumer-1.properties --consumer.config consumer-2.properties
--producer.config producer.properties --whitelist my-topic
- 注意,我们用 --whitelist 选项指定topic列表。此选项允许使用java风格的正则表达式。
- 所以你可以使用 --whitelist 'A|B' ,A和B是镜像名。或者你可以镜像所有topic。也可以使用--whitelist ‘*’镜像所有topic,为了确保引用的正则表达式不会被shell认为是一个文件路径,我们允许使用‘,’ 而不是‘|’指定topic列表。
- 你可以很容易的排除哪些是不需要的,可以用--blacklist来排除,目前--new.consumer不支持。
- 镜像结合配置 auto.create.topics.enable=true,这样副本集群就会自动创建和复制。
检查消费者位置
有时候需要去查看你的消费者的位置。
我们有一个显示“消费者组中”所有消费者的位置的工具,显示日志其落后多远。如下:
- 消费者组名为my-group,消费者topic名为my-topic,
> bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect localhost:2181 --group test
Group Topic Pid Offset logSize Lag Owner
my-group my-topic 0 0 0 0 test_jkreps-mn-1394154511599-60744496-0
my-group my-topic 1 0 0 0 test_jkreps-mn-1394154521217-1a0be913-0
- 注意:在0.9.0.0,kafka.tools.ConsumerOffsetChecker 已经不支持了。你应该使用 kafka.admin.ConsumerGroupCommand 或 bin/kafka-consumer-groups.sh 脚本来管理消费者组,包括用新消费者API创建的消费者。
## 0.9+ bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group test-consumer-group ## 0.10+ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
Managing Consumer Groups(管理消费者组)
用 ConsumerGroupCommand 工具,我们可以使用 list,describe,或 delete 消费者组(注意,删除只有在分组元数据存储在zookeeper的才可用)。
- 当使用新消费者API(broker协调处理分区和重新平衡),当该组的最后一个提交的偏移到期时,该组被删除。
示例:
- 要列出所有主题中的所有用户组:
> bin/kafka-consumer-groups.sh --bootstrap-server broker1:9092 --list test-consumer-group
- 要像前面的示例中那样使用ConsumerOffsetChecker查看偏移量,可以这样“describe”消费者组:
> bin/kafka-consumer-groups.sh --bootstrap-server broker1:9092 --describe --group test-consumer-group GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER test-consumer-group test-foo 0 1 3 2 consumer-1_/127.0.0.1
还有一切其他的命令可以提供消费组更多详细信息:
- -members:此选项提供使用者组中所有活动成员的列表。
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members CONSUMER-ID HOST CLIENT-ID #PARTITIONS consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1 consumer1 2 consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1 consumer4 1 consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1 consumer2 3 consumer3-ecea43e4-1f01-479f-8349-f9130b75d8ee /127.0.0.1 consumer3 0
- --members --verbose:除了上述“ --members”选项报告的信息之外,此选项还提供分配给每个成员的分区
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members --verbose CONSUMER-ID HOST CLIENT-ID #PARTITIONS ASSIGNMENT consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1 consumer1 2 topic1(0), topic2(0) consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1 consumer4 1 topic3(2) consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1 consumer2 3 topic2(1), topic3(0,1) consumer3-ecea43e4-1f01-479f-8349-f9130b75d8ee /127.0.0.1 consumer3 0 -
- -offsets:默认的describe选项,与“--describe”选项相同的输出。
- --state:此选项提供有用的组级别信息
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --state COORDINATOR (ID) ASSIGNMENT-STRATEGY STATE #MEMBERS localhost:9092 (0) range Stable 4
要手动删除一个或多个消费者组,可以使用“--delete”:
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group my-group --group my-other-group
Deletion of requested consumer groups ('my-group', 'my-other-group') was successful.
要重置消费者组的offset,可以使用“--reset-offsets”选项。此选项同一时间只支持一个消费者组操作。它需要定义以下范围:--all-topics 或 --topic。
- 除非您使用 --from-file 方案,否则必须选择一个范围。另外,首先请确保消费者实例处于非活动状态。
它有3个执行操作命令选项:
- (默认)显示要重置的offset
- 执行--reset-offsets处理
- --export:将结果导出为CSV格式。
--reset-offsets 还具有以下场景可供选择(必须选择至少一个场景):
- --reset-offsets :将offset重置为与日期时间的offset。 格式:'YYYY-MM-DDTHH:mm:SS.sss'
- --to-earliest : 将offset重置为最早的offset。
- --to-latest : 将offsets重置为最新的offsets。
- --shift-by : 重置offsets,通过移位“n”,其中“ n”可以为正或负。
- --from-file : 将offset重置为CSV文件中定义的值。
- --to-current : 将offset重置为当前的offset。
- --by-duration : 将offset重置为从当前时间戳重置为持续时间offset。格式:“ PnDTnHnMnS”
- --to-offset : 将offset重置为指定的。
请注意,超出范围的offset将调整为可用的offset。例如,如果offset最大为10,设置为15时,则实际上将选择offset将为10。
例如,要将消费者组的offset重置为最新的offset:
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group consumergroup1 --topic topic1 --to-latest
TOPIC PARTITION NEW-OFFSET
topic1 0 0
- 如果你使用是老的高级消费者并在zookeeper存储消费者组的元数据(即:offsets.storage=zookeeper),则通过 --zookeeper,而不是 bootstrap-server
> bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list
扩大集群
增加新服务到kafka集群是很容易的,只要为新服务分配一个独一无二的 Broker ID 并启动即可。但是,新的服务不会自动分配到任何数据,因此除非将分区移动到这些服务器上,否则在创建新主题之前,它们不会执行任何工作。
因此,通常在将计算机添加到集群时,您会希望将一些现有数据迁移到这些计算机。
迁移数据的过程是手动启动的,但是执行过程是完全自动化的。在kafka后台内部中,kafka将添加新的服务器,并作为正在迁移分区的 follower,来完全复制该分区现有的数据。当新服务器完全复制该分区的内容并加入同步副本,成为现有副本之一后,就将现有的副本分区上的数据删除。
分区重新分配工具可以用于跨 broker 迁移分区,理想的分区分配将确保所有的 broker 数据负载和分区大小。分区分配工具没有自动研究 kafka 集群的数据分布和迁移分区达到负载分布的能力,因此,管理员要弄清楚哪些topic或分区应该迁移。
分区分配工具的3种模式:
- --generate:在此模式下,给定主题列表和代理列表,以将指定的topic的所有parition都移动到新的broker。(是生成分配规则json文件的)
- 此选项仅提供了一种方便的方法,可以在给定主题和目标代理列表的情况下生成分区重新分配计划。
- --execute:在此模式下,是执行你用 --generate 生成的分配规则json文件的,(用 --reassignment-json-file 选项),可以是自定义的分配计划,也可以是由管理员或通过 --generate 选项生成的。
- --verify:在此模式下,该工具验证在最后一次 --execute 期间列出的所有分区的重新分配状态。状态可以是成功完成、失败或正在进行
自动将数据迁移到新机器
使用分区重新分配工具将从当前的broker集的一些topic移到新添加的broker。同时扩大现有集群,因为这很容易将整个topic移动到新的broker,而不是每次移动一个parition,你要提供新的broker和新broker的目标列表的topic列表(就是刚才的生成的json文件)。然后工具将根据你提供的列表把topic的所有parition均匀地分布在所有的broker,topic的副本保持不变。
例如,下面的例子将主题 foo1,foo2 的所有分区移动到新的 broker 5,6。移动结束后,主题 foo1 和 foo2 所有的分区都会只会在 broker 5,6:
【下面所有的json文件,都是要你自己新建的,不是自动创建的,需要你自己把生成的规则复制到你新建的json文件里,然后执行。】
- 执行迁移工具需要接收一个json文件,首先需要你确认topic的迁移计划并创建json文件,如下所示
> cat topics-to-move.json {"topics": [{"topic": "foo1"}, {"topic": "foo2"}], "version":1 }
- 一旦json文件准备就绪,就可以使用分区重新分配工具来生成候选分配
> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-move.json --broker-list "5,6" --generate Current partition replica assignment {"version":1, "partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]}, {"topic":"foo1","partition":0,"replicas":[3,4]}, {"topic":"foo2","partition":2,"replicas":[1,2]}, {"topic":"foo2","partition":0,"replicas":[3,4]}, {"topic":"foo1","partition":1,"replicas":[2,3]}, {"topic":"foo2","partition":1,"replicas":[2,3]}] } Proposed partition reassignment configuration {"version":1, "partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]}, {"topic":"foo1","partition":0,"replicas":[5,6]}, {"topic":"foo2","partition":2,"replicas":[5,6]}, {"topic":"foo2","partition":0,"replicas":[5,6]}, {"topic":"foo1","partition":1,"replicas":[5,6]}, {"topic":"foo2","partition":1,"replicas":[5,6]}] }
- 生成从主题 foo1,foo2 迁移所有的分区到 broker 5,6 的候选分配规则。
- 注意,这个时候,迁移还没有开始,它只是告诉你当前分配和新的分配规则,当前分配规则用来回滚,新的分配规则保存在json文件(例如,我保存在 expand-cluster-reassignment.json这个文件下)然后,用--execute选项来执行它。
> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --execute Current partition replica assignment {"version":1, "partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]}, {"topic":"foo1","partition":0,"replicas":[3,4]}, {"topic":"foo2","partition":2,"replicas":[1,2]}, {"topic":"foo2","partition":0,"replicas":[3,4]}, {"topic":"foo1","partition":1,"replicas":[2,3]}, {"topic":"foo2","partition":1,"replicas":[2,3]}] } Save this to use as the --reassignment-json-file option during rollback Successfully started reassignment of partitions {"version":1, "partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]}, {"topic":"foo1","partition":0,"replicas":[5,6]}, {"topic":"foo2","partition":2,"replicas":[5,6]}, {"topic":"foo2","partition":0,"replicas":[5,6]}, {"topic":"foo1","partition":1,"replicas":[5,6]}, {"topic":"foo2","partition":1,"replicas":[5,6]}] }
- 最后,--verify 选项用来检查parition重新分配的状态:
- 注意, expand-cluster-reassignment.json(与--execute选项使用的相同)和--verify选项一起使用。
> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --verify Status of partition reassignment: Reassignment of partition [foo1,0] completed successfully Reassignment of partition [foo1,1] is in progress Reassignment of partition [foo1,2] is in progress Reassignment of partition [foo2,0] completed successfully Reassignment of partition [foo2,1] completed successfully Reassignment of partition [foo2,2] completed successfully
自定义分区分配和迁移
分区重新分配工具也可以有选择性将分区副本移动到指定的broker。
- 当用这种方式,假定你已经知道了分区规则,不需要通过工具生成规则,可以跳过 --generate,直接使用 --execute
例如,下面的例子是移动主题foo1的分区0到brokers 5,6 和主题foo2的分区1到broker 2,3。
- 手工写一个自定义的分配计划到json文件中:
> cat custom-reassignment.json {"version":1,"partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},{"topic":"foo2","partition":1,"replicas":[2,3]}]}
- 然后,--execute 选项执行分配处理:
> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --execute Current partition replica assignment {"version":1, "partitions":[{"topic":"foo1","partition":0,"replicas":[1,2]}, {"topic":"foo2","partition":1,"replicas":[3,4]}] } Save this to use as the --reassignment-json-file option during rollback Successfully started reassignment of partitions {"version":1, "partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]}, {"topic":"foo2","partition":1,"replicas":[2,3]}] }
- 最后使用--verify 验证:
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --verify Status of partition reassignment: Reassignment of partition [foo1,0] completed successfully Reassignment of partition [foo2,1] completed successfully
退役brokers
分区重新分配工具还不能自动为退役代理生成重新分配计划。因此,管理员必须制定一个重新分配计划,将托管在 broker 上的所有分区的副本转移到其他代理上。 这可能会相对繁琐,因为重新分配需要确保所有副本从退役的broker迁移到另一个没有停运的broker。 为了简化这个过程,我们计划在将来为退役 broker 添加工具支持。
增加副本
在现有分区增加副本是很容易的,只要指定自定义的重新分配的json文件脚本,并用 --execute 选项去执行这个脚本。
例如,下面的示例将主题 foo 的分区 0 的复制因子从 1 增加到 3。在增加复制因子之前,该分区的唯一副本存在于代理 5 上。作为增加复制因子的一部分,我们将在代理 6 和 7 上添加更多副本:
- 手工写一个自定义的分配的json脚本:
> cat increase-replication-factor.json {"version":1, "partitions":[{"topic":"foo","partition":0,"replicas":[5,6,7]}]}
- 然后,用--execute选项运行json脚本:
> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --execute Current partition replica assignment {"version":1, "partitions":[{"topic":"foo","partition":0,"replicas":[5]}]} Save this to use as the --reassignment-json-file option during rollback Successfully started reassignment of partitions {"version":1, "partitions":[{"topic":"foo","partition":0,"replicas":[5,6,7]}]}
- -- version 选项来验证parition分配的状态。注意,使用同样的 increase-replication-factor.json
bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --verify Status of partition reassignment: Reassignment of partition [foo,0] completed successfully
- 你也可以使用kafka-topic工具验证:
> bin/kafka-topics.sh --zookeeper localhost:2181 --topic foo --describe Topic:foo PartitionCount:1 ReplicationFactor:3 Configs: Topic: foo Partition: 0 Leader: 5 Replicas: 5,6,7 Isr: 5,6,7
彻底删除topic【???】
kafka0.8.1.1以及之前版本都无法使用类似一条命令就彻底删除topic,以前看过网上一些删除命令不过只是在zookeeper注销信息而已,但是实际的日志内容还是保存在kafka log中。
机器环境如下:
Kafka目录:/usr/local/kafka_2.10-0.8.1.1
日志保存目录log.dirs:/data1/kafka/log/
删除的topic名字:zitest2
- 从zookeerer删除信息:
/usr/local/kafka_2.10-0.8.1.1/bin/kafka-run-class.shkafka.admin.DeleteTopicCommand --zookeeper 10.12.0.91:2181,10.12.0.92:2181,10.12.0.93:2181/kafka--topic zitest2
- 成功后返回信息:deletion succeeded!
- JPS查看kill掉QuorumPeerMain和Kafka进程
- 从log.dirs目录删除文件,可以看到多个子目录名字如zitest2-0,zitest2-1…zitest2-n(就是你topic的partition个数)
rm –fr zitest2-0……zitest2-n
- 修改日志目录的recovery-point-offset-checkpoint和replication-offset-checkpoint文件(要小心删除,否则待会kafka不能正常启动起来)
- replication-offset-checkpoint格式如下:
0 4(partition总数) zitest2 0 0 zitest2 3 0 hehe 0 0 hehe 1 0
- 修改后如下:
0 2(partition总数) hehe 0 0 hehe 1 0
- 把含有zitest2行全部去掉,并且把 partition 总数修改为减去 zitest2 的 partition 的剩余数目,同理 recovery-point-offset-checkpoint 也是这样修改。
完成后就可以正常启动zookeeper和kafka。
数据迁移期间限制带宽的使用【???】
Kafka提供一个broker之间复制传输的流量限制,限制了副本从机器到另一台机器的带宽上限。当重新平衡集群,引导新broker,添加或移除broker时候,这是很有用的。因为它限制了这些密集型的数据操作从而保障了对用户的影响。
有2个接口可以实现限制:
- 最简单和最安全的是调用 kafka-reassign-partitions.sh 时加限制。
- 另外 kafka-configs.sh 也可以直接查看和修改限制值。
例如,当执行重新平衡时,用下面的命令,它在移动分区时,将不会超过 50MB/s:
$ bin/kafka-reassign-partitions.sh --zookeeper myhost:2181 --execute --reassignment-json-file bigger-cluster.json —throttle 50000000
当你运行这个脚本,你会看到这个限制:
The throttle limit was set to 50000000 B/s
Successfully started reassignment of partitions.
如果你想在重新平衡期间修改限制,增加吞吐量,以便完成的更快。你可以重新运行 --execute 命令,用相同的reassignment-json-file:
$ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --execute --reassignment-json-file bigger-cluster.json --throttle 700000000
There is an existing assignment running.
The throttle limit was set to 700000000 B/s
一旦重新平衡完成,可以使用 --verify 操作验证重新平衡的状态。如果重新平衡已经完成,限制也会通过--verify命令移除。
- 这点很重要,因为一旦重新平衡完成,并通过--veriry操作及时移除限制。否则可能会导致定期复制操作的流量也受到限制。
- 当--verify执行,并且重新分配已完成时,此脚本将确认限制被移除:
$ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --verify --reassignment-json-file bigger-cluster.json
Status of partition reassignment:
Reassignment of partition [my-topic,1] completed successfully
Reassignment of partition [mytopic,0] completed successfully
Throttle was removed.
管理员还可以使用 kafka-configs.sh 验证已分配的配置。有 2 对限制配置用于管理限流。而限制值本身,是个 broker 级别的配置,用于动态属性配置:
leader.replication.throttled.rate
follower.replication.throttled.rate
此外,还有枚举集合的限流副本:
leader.replication.throttled.replicas
follower.replication.throttled.replicas
其中每个topic配置,所有4个配置值通过kafka-reassign-partitions.sh(下面讨论)自动分配。
查看限流配置:
$ bin/kafka-configs.sh --describe --zookeeper localhost:2181 --entity-type brokers
Configs for brokers '2' are leader.replication.throttled.rate=700000000,follower.replication.throttled.rate=700000000
Configs for brokers '1' are leader.replication.throttled.rate=700000000,follower.replication.throttled.rate=700000000
这显示了应用于复制协议的leader和follower的限制。默认情况下,2个都分配了相同的限制值。
要查看限流副本的列表:
$ bin/kafka-configs.sh --describe --zookeeper localhost:2181 --entity-type topics
Configs for topic 'my-topic' are leader.replication.throttled.replicas=1:102,0:101,
follower.replication.throttled.replicas=1:101,0:102
这里我们看到 leader 限制被应用到 broker 102 上的分区 1 和 broker 101 的分区 0。同样,follower 限制应用到 broker 101 的分区 1 和 broker 102 的分区 0。
默认情况下,kafka-reassign-partitions.sh 会将 leader 限制应用于重新平衡前存在的所有副本,任何一个副本都可能是 leader。它将应用 follower 限制到所有移动目的地。因此,如果 broker 101,102 上有一个副本分区,被分配给 102,103,则该分区的 leader 限制,将被应用到 101,102,并且 follower 限制将仅被应用于103。
如果需要,你还可以使用 kafka-configs.sh 的 --alter 开关手动地更改限制配置。
安全的使用限制复制
在使用限制复制时应特别的小心,特别是:
- 限制移除:
- 一旦重新分配完成,限制应该及时的移除(通过运行kafka-reassign-partitions —verify移除)。
- 确保进展:
- 如果限制设置的太低,与传入的写入速率相比,复制可能无法进行:
max(BytesInPerSec) > throttle
- 其中BytesInPerSec是监控生产者写入到broker的吞吐量。
可以使用该命令监视重新平衡期间复制是否在进行,使用以下方式:
kafka.server:type=FetcherLagMetrics,name=ConsumerLag,clientId=([-.\w]+),topic=([-.\w]+),partition=([0-9]+)
在复制期间落后应不断地减少,如果没有缩小,则管理员通过上面介绍的方式增加限制的吞吐量。
设置配额
默认情况下,客户端的配额不受限制。可以为每个(user,client-id),user或client-id分组设置自定义的配额。
配置自定义的配额(user=user1,client-id=clientA):
> bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048' --entity-type users --entity-name user1 --entity-type clients --entity-name clientA
Updated config for entity: user-principal 'user1', client-id 'clientA'.
为 user=user1 配置自定义的配额:
> bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048' --entity-type users --entity-name user1
Updated config for entity: user-principal 'user1'.
为 client-id=clientA 配置自定义的配额:
> bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048' --entity-type clients --entity-name clientA
Updated config for entity: client-id 'clientA'.
- 可以通过--entity-default为(user,client-id),user或client-id group设置默认的配额。
为user=userA配置默认client-id配额:
> bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048' --entity-type users --entity-name user1 --entity-type clients --entity-default
Updated config for entity: user-principal 'user1', default client-id.
为user配置默认配额:
> bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048' --entity-type users --entity-default
Updated config for entity: default user-principal.
为client-id配置默认配额:
> bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048' --entity-type clients --entity-default
Updated config for entity: default client-id.
为指定的(user,client-id)展示配额:
> bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name user1 --entity-type clients --entity-name clientA
Configs for user-principal 'user1', client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048
为指定的user展示配额:
> bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name user1
Configs for user-principal 'user1' are producer_byte_rate=1024,consumer_byte_rate=2048
为指定的client-id展示配额:
> bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type clients --entity-name clientA
Configs for client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048
如果没有指定名称,则展示指定的类型的,查看所有user,或(user,client):
> bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users
Configs for user-principal 'user1' are producer_byte_rate=1024,consumer_byte_rate=2048
Configs for default user-principal are producer_byte_rate=1024,consumer_byte_rate=2048
> bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-type clients
Configs for user-principal 'user1', default client-id are producer_byte_rate=1024,consumer_byte_rate=2048
Configs for user-principal 'user1', client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048
可以通过在 broker 中设置以下配置来设置适用于所有 client-id 的默认配额。
- 仅当未在 Zookeeper 中配置配额覆盖或默认值时,才应用这些属性。
- 默认情况下,每个 client-id 接收一个无限制的配额。
以下将每个producer和consumer client-id的默认配额设置为10MB /秒。
quota.producer.default=10485760
quota.consumer.default=10485760
- 请注意,这些属性已被弃用,可能会在将来的版本中删除。 请优先使用kafka-configs.sh。
从老版本升级kafka
从0.8.x, 0.9.x 或 0.10.0.X 升级到 0.10.1.0
0.10.1.0有线协议更改,通过遵循以下建议的滚动升级,在升级期间不会停机。但是,需要注意升0.10.1.0中潜在的突发状况。
注意:
- 由于引入了新的协议,要在升级客户端之前先升级kafka集群(即,0.10.1.x仅支持 0.10.1.x或更高版本的broker,但是0.10.1.x的broker向下支持旧版本的客户端)
滚动升级:
- 更新所有 broker 的 server.properties 文件,并添加以下属性:
- inter.broker.protocol.version=CURRENT_KAFKA_VERSION (如:0.8.2.0, 0.9.0.0 或 0.10.0.0).
- log.message.format.version=CURRENT_KAFKA_VERSION (有关此配置的详细信息,请查看升级后潜在的性能影响。)
- 每次升级一个broker:关闭broker,替换新版本,然后重新启动它。
- 一旦整个群集升级,通过编辑 inter.broker.protocol.version 并将其设置为 0.10.1.0 来转换所有协议。
- 如果之前的消息格式是 0.10.0,则将 log.message.format.version 更改为 0.10.1(这无影响,因为 0.10.0 和 0.10.1 的消息格式是相同的)。
- 如果之前的消息格式版本低于 0.10.0,还不能更改 log.message.format.version;一旦所有的消费者都已升级到 0.10.0.0 或更高版本时,才能更改此参数。
- 逐个重新启动broker,使新协议版本生效。
- 如果 log.message.format.version 低于 0.10.0,请等待,知道所有消费者升级到0.10.0或更新的版本,然后将每个broker的 log.message.format.version 更改为 0.10.1。然后逐个重启。
注意:
- 如果你可接受停机,你可以简单地将所有broker关闭,更新版本并重启启动,它们将默认从新版本开始。
- 变换协议版本和重启启动可以在 broker 升级完成后的任何时间去做,不必马上做。
在0.10.1.0中潜在的变化
- 新的java消费者不再是测试阶段了,我们建议将其应用到所有的新开发当中。旧的Scala使用仍然支持,但将在下一个版本中弃用,并在未来的主要版本中移除。
- --new-consumer / --new.consumer 转换不再需要使用 MirrorMaker 和类似于 Console 消费者工具。只需要通过一个 Kafka broker 连接,而不是 ZooKeeper 了。另外,控制台消费者和旧消费者已弃用,并且将在未来的主要版本中移除。
- Kafka集群现在可通过集群 ID 来标识唯一,broker 升级到 0.10.1.0 时将自动的生成。集群 ID 可通过 kafka.server:type=KafkaServer,name=ClusterId 获取。它是元数据相应的一部分,序列化,客户端拦截器和度量记录器可通过实现 ClusterResourceListener 接口来接收集群ID。
- BrokerState "RunningAsController" (value 4) 已被移除。由于一个bug,broker 仅在转换出来之前处于这种状态,因此移除影响应该是最小的。
- 推荐的方法是通过 kafka 检查给定的 broker 是否是控制器:controller:type=KafkaController,name=ActiveControllerCount
- 新的Java消费者现在允许用户通过分区上的时间戳来搜索offset。
- 新的Java消费者现在支持后台线程心跳检测,有一个新的配置 max.poll.interval.ms 控制消费者主动离开组之前poll调用之间的最大时间(默认是5 分钟)。配置 request.timeout.ms 的值必须始终大于 max.poll.interval.ms,因为 JoinGroup 请求在消费者重新平衡时候阻塞服务器的最大时间。因此我们更改了其默认值超过5分钟,最后,session.timeout.ms 的默认值已调整为10秒,并 max.poll.records 的默认值更改为 500。
- 当使用 Authorizer 并且用户对topic没有描述授权时,broker 将不再向请求返回 TOPIC_AUTHORIZATION_FAILED 错误,因为这会泄漏topic名称。 相反,将返回 UNKNOWN_TOPIC_OR_PARTITION 错误代码。 当使用生产者和消费者时,这可能导致意外的超时或延迟,因为Kafka客户端通常将在未知的topic错误时自动重试。 如果您怀疑这可能已经正在发生,你应该查阅客户端日志。
- 获取响应的默认的限制大小(消费者为50MB,副本为10MB)。现有的分区限制也适用(消费者和副本是1MB)。注意,这些限制不是绝对的最大值(下一节解释)。
- 如果一个消息大于响应/分区大小限制,消费者和副本可以继续使用。更具体的是,如果在第一个非空分区中的第一个消息大于限制,则消息将仍然返回。
- kafka.api.FetchRequest 和 kafka.javaapi.FetchRequest 中增加了重载的构造函数。以允许调用者去指定分区的顺序(因为在v3中顺序很重要)。之前的构造函数已弃用。在请求发送之前,以避免资源匮乏问题引起的混洗。
新协议版本
- ListOffsetRequest v1支持基于时间戳的精确offset搜索。
- MetadataResponse v2引入了一个新字段:“cluster_id”。
- FetchRequest v3支持限制响应大小(除了现有的分区限制)。
- JoinGroup v1引入了一个新字段:“rebalance_timeout”。
从0.8.x 或 0.9.x 升级到 0.10.0.0
0.10.0.0具有潜在的突变更改(请在升级之前查看),以及升级后可能的性能影响。 通过遵循以下建议的滚动升级计划,可保障在升级期间和之后不会出现停机时间和性能影响。 注意:由于引入了新协议,因此在升级客户端之前先升级Kafka集群。
注意:
- 对于版本 0.9.0.0:由于 0.9.0.0 中有一个bug,依赖于Zookeeper(旧的Scala高级消费者和MirrorMaker如果一起使用)的客户端将无法在 0.10.0.x 中使用。因此,broker升级到 0.10.0.x 之前,先升级 0.9.0.0 客户端到 0.9.0.1。
- 对于 0.8.X 或 0.9.0.1 客户端,此步骤不是必需的。
滚动升级:
- 更新所有 broker 的 server.properties 文件,并添加以下配置:
- inter.broker.protocol.version=CURRENT_KAFKA_VERSION (例如:0.8.2 或 0.9.0.0).
- log.message.format.version=CURRENT_KAFKA_VERSION (有关此配置的详细信息,请查看升级后潜在的性能影响。)
- 升级broker,关闭它,然后升级到新版本,最后重启它。
- 一旦整个集群升级完成,通过编辑 inter.broker.protocol.version 设置为0.10.0.0转换所有协议。注意:你现在应该还不需要设置 message.format.version,此配置应该当所有的消费者升级为0.10.0.0时才需要设置。
- 依次重新启动broker,使新协议版本生效。
- 一旦所有的消费者已经升级为.10.0,设置每个 broker 的 log.message.format.version 为0.10.0,然后逐个重启。
注意:
- 如果你接受停机目,你可以简单粗暴的关闭所有broker,更新版本并重新启动。它们默认从新协议开始。
- 变换协议版本和重启启动可以在broker升级完成后的任何时间去做,不必马上做。
升级到0.10.0.0后潜在的性能影响
0.10.0 中的消息格式包括新的时间戳字段,并使用压缩消息的相关联的 offset。磁盘默认的消息格式是 0.10.0,消息格式可以通过 server.properties 中的 log.message.format.version 配置。如果消费者客户端版本低于 0.10.0.0。它只能“理解” 0.10.0 之前的消息格式。在这种情况下,broker 在发送响应到旧版本消费者之前转换 0.10.0 格式到之前的格式。然而,这样的话,broker 不是零复制传输。在 Kafka 社区关于性能影响的报告显示,在升级后,CPU 利用率从 20% 提高 100%。这迫使所有客户端马升级,促使性能恢复正常。为了避免消费者升级到 0.10.0.0 之前的消息转换,可以设置 log.message.format.version 为 0.8.2 或 0.9.0。这样,broker 仍然零复制传输将数据发送给旧的消费者。一旦消费者升级,就可以把消息格式更为 0.10.0,就可以享受含新时间戳和优化后的压缩新消息格式。转换只是为了确保兼容性,尽可能避免消息转换才是至关重要的。
- 客户端升级到 0.10.0.0,不会对性能产生影响。
注意:
- 通过设置消息格式版本,可以证明所有现有消息处于或低于该消息格式版本。否则消费者在0.10.0.0之前可能会中断。特别是,在消息格式设置为0.10.0之后,不应将其更改回较早的格式,因为它可能会在0.10.0.0之前的版本上中断消费者。
- 由于在每个消息中引入了额外的时间戳,生产者在发送少量消息可能会看到消息吞吐量下降(因为增加了开销)。 同样,复制每个消息传输也增加了8个字节。 如果你集群的能力与网络接近,可能会超过网卡,并看到由于过载的故障和性能问题。
- 如果生产者已经启用了压缩,则在某些情况下,可能注意到生产者吞吐量减少或broker的压缩率降低。当接收压缩消息时,0.10.0的broker避免再次压缩消息,这样减少延迟并提高吞吐量。然而,在某些情况下,这可能减少生产者的批次大小,导致较差的吞吐量。如果出现这种情况,可调整生产者的linger.ms 和 batch.size以提高吞吐量。另外,生产者用于压缩消息的缓存小于broker生产者使用的缓存,这可能对磁盘上的消息的压缩比有负面影响。 我们打算在未来的Kafka版本中进行配置。
0.10.0.0潜在的变化
- 从Kafka 0.10.0.0 开始,Kafka中的消息格式版本表示为Kafka的版本。例如,消息格式 0.9.0 指的是支持的最高消息版本就是 0.9.0。
- 消息格式0.10.0已经介绍过了,并且默认是使用的。消息包含了一个时间戳字段和压缩后消息的关系offset。
- 已经引入了 ProduceRequest/Response v2,并默认使用支持消息格式0.10.0。
- 已经引入了 FetchRequest/Response v2已经被引入,它默认使用支持消息格式0.10.0。
- MessageFormatter 接口从def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) 更改为 def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream)
- MessageReader 接口从 def readMessage(): KeyedMessage[Array[Byte], Array[Byte]] 更改为 def readMessage(): ProducerRecord[Array[Byte], Array[Byte]]
- MessageFormatter 的包从 kafka.tools 到 kafka.common
- MessageReader 的包从 kafka.tools 到 kafka.common
- MirrorMakerMessageHandler不再处理(记录:MessageAndMetadata [Array [Byte],Array [Byte]])方法从未被调用用。
- 0.7 版本的 KafkaMigrationTool 不再和 kafka 一起打包。如果你需要从0.7迁移到0.10.0,请先迁移到0.8,然后按照的升级步骤从0.8升级到0.10.0。
- 新消费者API已标准化,接收 java.util.Collection 作为方法参数的序列化类型。升级现有的版本才能使用0.10.0客户端库
- LZ4压缩消息处理已更改为使用可互操作的规范框架(LZ4f v1.5.1)。为了保留与旧客户端的兼容性,此改变仅适用于消息格式为0.10.0和更高版本。使用v0/v1(消息格式0.9.0)Produce/Fetch LZ4压缩消息的客户端应继续使用0.9.0实现框架。使用Produce/Fetch协议v2或更高版本的客户端应使用可互操作的LZ4f框架。可互操作的LZ4库的列表可在https://www.lz4.org/查看
在0.10.0.0的显著变化
- 从0.10.0.0开始,增加一个新的客户端 Kafka Streams 客户端,用于流式处理存储在kafka topic的数据。这个新客户端仅支持0.10.x或更高的版本。
- 新消费者默认 receive.buffer.bytes 是 64 K。
- 新的消费者现在公开了 exclude.internal.topics 配置,以防止内部topic(例如消费者offset topic)被其他的正则匹配订阅。默认是启用。
- 旧的的Scala的生产者已经弃用。使用者尽快使用最新的Java客户端。
新的消费者API已标记为稳定。
从0.8.0, 0.8.1.X或0.8.2.X升级到0.9.0.0
9.0.0有潜在的中断更改风险(在升级之前需要知道),并且与之前版本的broker之间的协议改变。这意味着此次升级可能和客户端旧版本不兼容。因此在升级客户端之前,先升级kafka集群。如果你使用MirrorMaker下游集群,则同样应首先升级。
滚动升级:
- 升级所有 broker 的 server.properties,并在其中添加 inter.broker.protocol.version = 0.8.2.X
- 每次升级一个 broker:关闭 broker,替换新版本,然后重新启动。
- 一旦整个群集升级,通过编辑 inter.broker.protocol.version 并将其设置为 0.9.0.0 来转换所有协议。
- 逐个重新启动 broker,使新协议版本生效。
注意:
- 如果你可接受停机,你可以简单地将所有broker关闭,更新版本并重启启动,协议将默认从新版本开始。
- 变换协议版本和重启启动可以在broker升级完成后的任何时间去做,不必马上做。
0.9.0.0潜在的中断变化
- Java 1.6不再支持。
- Scala 2.9不再支持。
- 默认情况下,1000以上的 Broker ID 为自动分配。如果你的集群高于该阈值,需相应地增加 reserved.broker.max.id 配置。
- replica.lag.max.messages 配置已经移除。分区leader在决定哪些副本处于同步时将不再考虑落后的消息的数。
- 配置参数 replica.lag.time.max.ms 现在不仅指自上次从副本获取请求后经过的时间,还指自副本上次被捕获以来的时间。 副本仍然从leader获取消息,但超过 replica.lag.time.max.ms 配置的最新消息将被认为不同步的。
- 压缩的topic不再接受没有key的消息,如果出现,生产者将抛出异常。 在0.8.x中,没有key的消息将导致日志压缩线程退出(并停止所有压缩的topic)。
- MirrorMaker不再支持多个目标集群。 它只接受一个--consumer.config。 要镜像多个源集群,每个源集群至少需要一个MirrorMaker实例,每个源集群都有自己的消费者配置。
- 在 org.apache.kafka.clients.tools。包下的Tools已移至org.apache.kafka.tools。所有包含的脚本仍将照常工作,只有直接导入这些类的自定义代码将受到影响。
- 在kafka-run-class.sh中更改了默认的Kafka JVM性能选项(KAFKA_JVM_PERFORMANCE_OPTS)。
- kafka-topics.sh脚本(kafka.admin.TopicCommand)现在退出,失败时出现非零退出代码。
- kafka-topics.sh脚本(kafka.admin.TopicCommand)现在将在topic名称由于使用“.”或“_”而导致风险度量标准冲突时打印警告。以及冲突的情况下的错误。
- kafka-console-producer.sh脚本(kafka.tools.ConsoleProducer)将默认使用新的Java Producer,用户必须指定“old-producer”才能使用旧生产者。
- 默认情况下,所有命令行工具都会将所有日志消息打印到stderr而不是stdout。
0.9.0.1中的显著变化
- 可以通过将 broker.id.generation.enable 设置为 false 来禁用新的 broker ID 生成功能。
- 默认情况下,配置参数 log.cleaner.enable 为true。 这意味着topic会清理。
- policy = compact 现在将被默认压缩,并且 128 MB 的堆(通过log.cleaner.dedupe.buffer.size)分配给清洗进程。你可能需要根据你对压缩topic的使用情况,查看log.cleaner.dedupe.buffer.size和其他log.cleaner配置值。
- 默认情况下,新消费者的配置参数 fetch.min.bytes 的默认值为1。
0.9.0.0弃用的
- kafka-topics.sh 脚本的变更topic配置已弃用(kafka.admin.ConfigCommand),以后将使用kafka-configs.sh(kafka.admin.ConfigCommand) 。
- kafka-consumer-offset-checker.sh(kafka.tools.ConsumerOffsetChecker)已弃用,以后将使用kafka-consumer-groups.sh (kafka.admin.ConsumerGroupCommand)
- kafka.tools.ProducerPerformance已弃用。以后将使用org.apache.kafka.tools.ProducerPerformance(kafka-producer-perf-test.sh也将使用新类)
- 生产者的block.on.buffer.full已弃用,并将在以后的版本中移除。目前其默认已经更为false。KafkaProducer将不再抛出BufferExhaustedException,而是使用max.block.ms来中止,之后将抛出TimeoutException。如果block.on.buffer.full属性明确地设置为true,它将设置max.block.ms为Long.MAX_VALUE和metadata.fetch.timeout.ms将不执行。
从0.8.1升级到0.8.2
0.8.2与0.8.1完全兼容。 关闭,更新代码并重新启动,逐个升级broker。
从0.8.0升级到0.8.1
0.8.1与0.8完全兼容。 关闭,更新代码并重新启动,逐个升级broker。
从0.7升级
版本0.7与较新版本不兼容。 对API,ZooKeeper数据结构,协议和配置进行了主要更改,以便添加复制(在0.7中缺失)。 从0.7版升级到更高版本需要一个特殊的迁移工具(通过下一章的API)。 此迁移可以在不停机的情况下完成。