Kafka:基本操作
添加、修改、删除 topic
如果第一次发布一个不存在的topic时,它会自动创建。也可以手动添加topic。
- 添加、修改 topic:
> bin/kafka-topics.sh --zookeeper zk_host:port/chroot --create --topic my_topic_name
--partitions 20 --replication-factor 3 --config x=y
- 副本控制每个消息在服务器中的备份。
- 如果有3个副本,那么最多允许有 2 个节点宕掉才能不丢数据,集群中推荐设置 2 或 3 个副本,才不会中断数据消费。
- 分区数控制 topic 将分片成多少 log。
- 关于分区数的影响,首先每个分区必须完整的存储在单个的服务器上。因此,如果你有20个分区的话(读和写的负载),那么完整的数据集将不超过20个服务器(不计算备份)。最后,分区数影响消费者的最大并发。
- 命令行上添加的配置覆盖了服务器的默认设置,服务器有关于时间长度的数据,应该保留。
- 副本控制每个消息在服务器中的备份。
- 更改topic的配置和分区:
- kafka版本 < 2.2
> bin/kafka-topics.sh --zookeeper zk_host:port/chroot --create --topic my_topic_name --partitions 20 --replication-factor 3 --config x=y
- kafka版本 >= 2.2
> bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic my_topic_name \ --partitions 40
- 请注意,分区的一种用例是在语义上对数据进行分区,添加分区不会更改现有数据的分区,因此如果消费者依赖该分区,可能会打扰消费者。
- 就是说,如果数据是通过“hash(key) % number_of_partition”进行分区划分的,那么该分区可能会因为添加分区而被搅乱,但是Kafka不会尝试以任何方式自动重新分发数据。
- kafka版本 < 2.2
- 添加配置:
> bin/kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics --entity-name my_topic_name --alter --add-config x=y
- 移除配置:
> bin/kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics --entity-name my_topic_name --alter --delete-config x
- 删除 topic:
> bin/kafka-topics.sh --bootstrap-server broker_host:port --delete --topic my_topic_name
- topic 删除选项默认是关闭的,设置服务器配置开启它。
delete.topic.enable=true
- Kafka 目前不支持减少分区数和改变备份数,但是可以通过迁移脚本来实现【???】。
优雅地关闭Kafka
Kafka集群将自动检测任何代理关闭或故障,并为该计算机上的分区选举新的领导者。 无论服务器发生故障还是为了维护或更改配置而故意关闭,都会发生这种情况。 对于后一种情况,Kafka支持一种更优雅的机制,即先停止服务器,然后再终止它。
当服务器正常停止时,它将利用两种优化:
- 它将所有日志同步到磁盘上,以避免重新启动时需要进行任何日志恢复(即验证日志尾部所有消息的校验和)。 日志恢复需要时间,因此可以加快有意重启的速度。
- 它将在关闭服务器之前将服务器所领导的所有分区迁移到其他副本。 这将使领导层转移更快,并将每个分区不可用的时间减少到几毫秒。
只要服务器停止运行(不是通过强行终止),都会自动同步日志,但受控的领导层迁移需要使用特殊设置:
controlled.shutdown.enable=true
- 注意:只有在 broker 上托管的所有分区都有副本的情况下(即复制因子大于 1 且至少有一个副本处于活动状态),受控关闭才会成功。
- 这通常是您想要的,因为关闭最后一个副本会使该主题分区不可用。
平衡leader
当一个 broker 停止或崩溃时,这个 broker 中所有分区的 leader 将转移给其他副本。
- 这意味着:在默认情况下,当这个broker重新启动之后,它的所有分区都将仅作为follower,不再用于客户端的读写操作。
为了避免这种不平衡,Kafka 提出了“首选副本”的概念:
- 如果分区的副本列表为1、5、9,则节点1将优先作为其他两个副本5和9的 Leader,因为它在副本列表中较早。
您可以让 Kafka群集通过运行以下命令,尝试将已恢复的副本恢复为 Leader :
# kafka版本 <= 2.4
> bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot
# kafka新版本
> bin/kafka-preferred-replica-election.sh --bootstrap-server broker_host:port
手动运行很无趣,你可以通过这个配置设置为自动执行:
auto.leader.rebalance.enable=true
镜像集群之间的数据
我们指的是kafka集群之间复制数据“镜像”,为避免在单个集群中的节点之间发生复制混乱的。
kafka附带了kafka集群之间的镜像数据的工具。该工具从一个源集群读取和写入到目标集群,像这样:
常见的用例是镜像在另一个数据中心提供一个副本。
- 你可以运行很多这样的镜像进程来提高吞吐和容错性(如果某个进程挂了,则其他的进程会接管)
- 数据从源集群中的topic读取并将其写入到目标集群中相名的topic。事实上,镜像制作不比消费者和生产者连接要好。
- 源和目标集群是完全独立的实体:分区数和offset可以都不相同,就是因为这个原因,镜像集群并不是真的打算作为一个容错机制(消费者位置是不同的),为此,我们推荐使用正常的集群复制。然而,镜像制造将保留和使用分区的消息key,以便每个键基础上保存顺序。
示例,从两个输入集群镜像到一个topic(名为:my-topic):
> bin/kafka-run-class.sh kafka.tools.MirrorMaker
--consumer.config consumer-1.properties --consumer.config consumer-2.properties
--producer.config producer.properties --whitelist my-topic
- 注意,我们用 --whitelist 选项指定topic列表。此选项允许使用java风格的正则表达式。
- 所以你可以使用 --whitelist 'A|B' ,A和B是镜像名。或者你可以镜像所有topic。也可以使用--whitelist ‘*’镜像所有topic,为了确保引用的正则表达式不会被shell认为是一个文件路径,我们允许使用‘,’ 而不是‘|’指定topic列表。
- 你可以很容易的排除哪些是不需要的,可以用--blacklist来排除,目前--new.consumer不支持。
- 镜像结合配置 auto.create.topics.enable=true,这样副本集群就会自动创建和复制。
检查消费者位置
有时候需要去查看你的消费者的位置。
我们有一个显示“消费者组中”所有消费者的位置的工具,显示日志其落后多远。如下:
- 消费者组名为my-group,消费者topic名为my-topic,
> bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect localhost:2181 --group test
Group Topic Pid Offset logSize Lag Owner
my-group my-topic 0 0 0 0 test_jkreps-mn-1394154511599-60744496-0
my-group my-topic 1 0 0 0 test_jkreps-mn-1394154521217-1a0be913-0
- 注意:在0.9.0.0,kafka.tools.ConsumerOffsetChecker 已经不支持了。你应该使用 kafka.admin.ConsumerGroupCommand 或 bin/kafka-consumer-groups.sh 脚本来管理消费者组,包括用新消费者API创建的消费者。
## 0.9+ bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group test-consumer-group ## 0.10+ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
Managing Consumer Groups(管理消费者组)
用 ConsumerGroupCommand 工具,我们可以使用 list,describe,或 delete 消费者组(注意,删除只有在分组元数据存储在zookeeper的才可用)。
- 当使用新消费者API(broker协调处理分区和重新平衡),当该组的最后一个提交的偏移到期时,该组被删除。
示例:
- 要列出所有主题中的所有用户组:
> bin/kafka-consumer-groups.sh --bootstrap-server broker1:9092 --list test-consumer-group
- 要像前面的示例中那样使用ConsumerOffsetChecker查看偏移量,可以这样“describe”消费者组:
> bin/kafka-consumer-groups.sh --bootstrap-server broker1:9092 --describe --group test-consumer-group GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER test-consumer-group test-foo 0 1 3 2 consumer-1_/127.0.0.1
还有一切其他的命令可以提供消费组更多详细信息:
- -members:此选项提供使用者组中所有活动成员的列表。
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members CONSUMER-ID HOST CLIENT-ID #PARTITIONS consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1 consumer1 2 consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1 consumer4 1 consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1 consumer2 3 consumer3-ecea43e4-1f01-479f-8349-f9130b75d8ee /127.0.0.1 consumer3 0
- --members --verbose:除了上述“ --members”选项报告的信息之外,此选项还提供分配给每个成员的分区
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members --verbose CONSUMER-ID HOST CLIENT-ID #PARTITIONS ASSIGNMENT consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1 consumer1 2 topic1(0), topic2(0) consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1 consumer4 1 topic3(2) consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1 consumer2 3 topic2(1), topic3(0,1) consumer3-ecea43e4-1f01-479f-8349-f9130b75d8ee /127.0.0.1 consumer3 0 -
- -offsets:默认的describe选项,与“--describe”选项相同的输出。
- --state:此选项提供有用的组级别信息
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --state COORDINATOR (ID) ASSIGNMENT-STRATEGY STATE #MEMBERS localhost:9092 (0) range Stable 4
要手动删除一个或多个消费者组,可以使用“--delete”:
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group my-group --group my-other-group
Deletion of requested consumer groups ('my-group', 'my-other-group') was successful.
要重置消费者组的offset,可以使用“--reset-offsets”选项。此选项同一时间只支持一个消费者组操作。它需要定义以下范围:--all-topics 或 --topic。
- 除非您使用 --from-file 方案,否则必须选择一个范围。另外,首先请确保消费者实例处于非活动状态。
它有3个执行操作命令选项:
- (默认)显示要重置的offset
- 执行--reset-offsets处理
- --export:将结果导出为CSV格式。
--reset-offsets 还具有以下场景可供选择(必须选择至少一个场景):
- --reset-offsets :将offset重置为与日期时间的offset。 格式:'YYYY-MM-DDTHH:mm:SS.sss'
- --to-earliest : 将offset重置为最早的offset。
- --to-latest : 将offsets重置为最新的offsets。
- --shift-by : 重置offsets,通过移位“n”,其中“ n”可以为正或负。
- --from-file : 将offset重置为CSV文件中定义的值。
- --to-current : 将offset重置为当前的offset。
- --by-duration : 将offset重置为从当前时间戳重置为持续时间offset。格式:“ PnDTnHnMnS”
- --to-offset : 将offset重置为指定的。
请注意,超出范围的offset将调整为可用的offset。例如,如果offset最大为10,设置为15时,则实际上将选择offset将为10。
例如,要将消费者组的offset重置为最新的offset:
> bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group consumergroup1 --topic topic1 --to-latest
TOPIC PARTITION NEW-OFFSET
topic1 0 0
- 如果你使用是老的高级消费者并在zookeeper存储消费者组的元数据(即:offsets.storage=zookeeper),则通过 --zookeeper,而不是 bootstrap-server
> bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list
扩大集群