查看“Kafka:基本操作”的源代码
←
Kafka:基本操作
跳到导航
跳到搜索
因为以下原因,您没有权限编辑本页:
您请求的操作仅限属于该用户组的用户执行:
用户
您可以查看和复制此页面的源代码。
[[category:Kafka]] == 添加、修改、删除 topic == 如果'''第一次发布一个不存在的topic时,它会自动创建'''。也可以手动添加topic。 # 添加、修改 topic: <syntaxhighlight lang="bash" highlight=""> > bin/kafka-topics.sh --zookeeper zk_host:port/chroot --create --topic my_topic_name --partitions 20 --replication-factor 3 --config x=y </syntaxhighlight> #* 副本控制每个消息在服务器中的备份。 #*: 如果有3个副本,那么最多允许有 2 个节点宕掉才能不丢数据,'''集群中推荐设置 2 或 3 个副本''',才不会中断数据消费。 #* 分区数控制 topic 将分片成多少 log。 #*: 关于分区数的影响,首先每个分区必须完整的存储在单个的服务器上。因此,如果你有20个分区的话(读和写的负载),那么完整的数据集将不超过20个服务器(不计算备份)。最后,分区数影响消费者的最大并发。 #* 命令行上添加的配置覆盖了服务器的默认设置,服务器有关于时间长度的数据,应该保留。 # 更改topic的配置和分区: ## kafka版本 < 2.2 ##: <syntaxhighlight lang="bash" highlight=""> > bin/kafka-topics.sh --zookeeper zk_host:port/chroot --create --topic my_topic_name --partitions 20 --replication-factor 3 --config x=y </syntaxhighlight> ## kafka版本 >= 2.2 ##: '''<syntaxhighlight lang="bash" highlight=""> > bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic my_topic_name \ --partitions 40 </syntaxhighlight>''' #* 请注意,分区的一种用例是在语义上对数据进行分区,添加分区不会更改现有数据的分区,因此如果消费者依赖该分区,可能会打扰消费者。 #*: 就是说,如果数据是通过“hash(key) % number_of_partition”进行分区划分的,那么该分区可能会因为添加分区而被搅乱,但是Kafka不会尝试以任何方式自动重新分发数据。 # 添加配置: #: <syntaxhighlight lang="bash" highlight=""> > bin/kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics --entity-name my_topic_name --alter --add-config x=y </syntaxhighlight> # 移除配置: #: <syntaxhighlight lang="bash" highlight=""> > bin/kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics --entity-name my_topic_name --alter --delete-config x </syntaxhighlight> # 删除 topic: #: <syntaxhighlight lang="bash" highlight=""> > bin/kafka-topics.sh --bootstrap-server broker_host:port --delete --topic my_topic_name </syntaxhighlight> #* topic 删除选项默认是关闭的,设置服务器配置开启它。 #*: <syntaxhighlight lang="bash" highlight=""> delete.topic.enable=true </syntaxhighlight> * Kafka 目前不支持减少分区数和改变备份数,但是可以通过迁移脚本来实现【???】。 == 优雅地关闭Kafka == <pre> Kafka集群将自动检测任何代理关闭或故障,并为该计算机上的分区选举新的领导者。 无论服务器发生故障还是为了维护或更改配置而故意关闭,都会发生这种情况。 对于后一种情况,Kafka支持一种更优雅的机制,即先停止服务器,然后再终止它。 </pre> 当服务器正常停止时,它将利用两种优化: # 它将所有日志同步到磁盘上,以避免重新启动时需要进行任何日志恢复(即验证日志尾部所有消息的校验和)。 日志恢复需要时间,因此可以加快有意重启的速度。 # 它将在关闭服务器之前将服务器所领导的所有分区迁移到其他副本。 这将使领导层转移更快,并将每个分区不可用的时间减少到几毫秒。 只要服务器停止运行(不是通过强行终止),都会自动同步日志,但受控的领导层迁移需要使用特殊设置: <syntaxhighlight lang="xml" highlight=""> controlled.shutdown.enable=true </syntaxhighlight> * 注意:只有在 broker 上托管的所有分区都有副本的情况下(即复制因子大于 1 且至少有一个副本处于活动状态),受控关闭才会成功。 *: 这通常是您想要的,因为关闭最后一个副本会使该主题分区不可用。 == 平衡leader == 当一个 broker 停止或崩溃时,这个 broker 中所有分区的 leader 将转移给其他副本。 * 这意味着:在默认情况下,当这个'''broker重新启动之后,它的所有分区都将仅作为follower''',不再用于客户端的读写操作。 为了避免这种不平衡,Kafka 提出了“'''首选副本'''”的概念: : 如果分区的副本列表为1、5、9,则节点1将优先作为其他两个副本5和9的 Leader,因为它在副本列表中较早。 您可以让 Kafka群集通过运行以下命令,尝试将已恢复的副本恢复为 Leader : <syntaxhighlight lang="bash" highlight=""> # kafka版本 <= 2.4 > bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot # kafka新版本 > bin/kafka-preferred-replica-election.sh --bootstrap-server broker_host:port </syntaxhighlight> 手动运行很无趣,你可以通过这个配置设置为自动执行: <syntaxhighlight lang="xml" highlight=""> auto.leader.rebalance.enable=true </syntaxhighlight> == 镜像集群之间的数据 == 我们指的是kafka集群之间复制数据“镜像”,为避免在单个集群中的节点之间发生复制混乱的。 kafka附带了kafka集群之间的镜像数据的工具。该工具'''从一个源集群读取和写入到目标集群''',像这样: : [[File:Kafka:镜像集群之间的数据.png|400px]] 常见的用例是镜像在另一个数据中心提供一个副本。 * 你可以运行很多这样的镜像进程来提高吞吐和容错性(如果某个进程挂了,则其他的进程会接管) * 数据从源集群中的topic读取并将其写入到目标集群中相名的topic。事实上,镜像制作不比消费者和生产者连接要好。 * 源和目标集群是完全独立的实体:分区数和offset可以都不相同,就是因为这个原因,镜像集群并不是真的打算作为一个容错机制(消费者位置是不同的),为此,我们推荐使用正常的集群复制。然而,镜像制造将保留和使用分区的消息key,以便每个键基础上保存顺序。 示例,从两个输入集群镜像到一个topic(名为:my-topic): <syntaxhighlight lang="bash" highlight=""> > bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config consumer-1.properties --consumer.config consumer-2.properties --producer.config producer.properties --whitelist my-topic </syntaxhighlight> * 注意,我们用 --whitelist 选项指定topic列表。此选项允许使用java风格的正则表达式。 *: 所以你可以使用 --whitelist 'A|B' ,A和B是镜像名。或者你可以镜像所有topic。也可以使用--whitelist ‘*’镜像所有topic,为了确保引用的正则表达式不会被shell认为是一个文件路径,我们允许使用‘,’ 而不是‘|’指定topic列表。 * 你可以很容易的排除哪些是不需要的,可以用--blacklist来排除,目前--new.consumer不支持。 * 镜像结合配置 auto.create.topics.enable=true,这样副本集群就会自动创建和复制。 == 检查消费者位置 == 有时候需要去查看你的消费者的位置。 我们有一个显示“消费者组中”所有消费者的位置的工具,显示日志其落后多远。如下: : 消费者组名为my-group,消费者topic名为my-topic, <syntaxhighlight lang="bash" highlight=""> > bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect localhost:2181 --group test Group Topic Pid Offset logSize Lag Owner my-group my-topic 0 0 0 0 test_jkreps-mn-1394154511599-60744496-0 my-group my-topic 1 0 0 0 test_jkreps-mn-1394154521217-1a0be913-0 </syntaxhighlight> * 注意:在0.9.0.0,kafka.tools.ConsumerOffsetChecker 已经不支持了。你应该使用 '''kafka.admin.ConsumerGroupCommand''' 或 '''bin/kafka-consumer-groups.sh''' 脚本来管理消费者组,包括用新消费者API创建的消费者。 *: <syntaxhighlight lang="bash" highlight=""> ## 0.9+ bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group test-consumer-group ## 0.10+ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group </syntaxhighlight> === Managing Consumer Groups(管理消费者组) === 用 '''ConsumerGroupCommand''' 工具,我们可以使用 '''list''','''describe''',或 '''delete''' 消费者组(注意,删除只有在分组元数据存储在zookeeper的才可用)。 * 当使用新消费者API(broker协调处理分区和重新平衡),当该组的最后一个提交的偏移到期时,该组被删除。 示例: # 要列出所有主题中的所有用户组: #: <syntaxhighlight lang="bash" highlight=""> > bin/kafka-consumer-groups.sh --bootstrap-server broker1:9092 --list test-consumer-group </syntaxhighlight> # 要像前面的示例中那样使用ConsumerOffsetChecker查看偏移量,可以这样“describe”消费者组: #: <syntaxhighlight lang="bash" highlight=""> > bin/kafka-consumer-groups.sh --bootstrap-server broker1:9092 --describe --group test-consumer-group GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER test-consumer-group test-foo 0 1 3 2 consumer-1_/127.0.0.1 </syntaxhighlight> 还有一切其他的命令可以提供消费组更多详细信息: * '''-members''':此选项提供使用者组中所有活动成员的列表。 *: <syntaxhighlight lang="bash" highlight=""> > bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members CONSUMER-ID HOST CLIENT-ID #PARTITIONS consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1 consumer1 2 consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1 consumer4 1 consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1 consumer2 3 consumer3-ecea43e4-1f01-479f-8349-f9130b75d8ee /127.0.0.1 consumer3 0 </syntaxhighlight> * '''--members --verbose''':除了上述“ --members”选项报告的信息之外,此选项还提供分配给每个成员的分区 *: <syntaxhighlight lang="bash" highlight=""> > bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members --verbose CONSUMER-ID HOST CLIENT-ID #PARTITIONS ASSIGNMENT consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1 consumer1 2 topic1(0), topic2(0) consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1 consumer4 1 topic3(2) consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1 consumer2 3 topic2(1), topic3(0,1) consumer3-ecea43e4-1f01-479f-8349-f9130b75d8ee /127.0.0.1 consumer3 0 - </syntaxhighlight> * '''-offsets''':默认的describe选项,与“--describe”选项相同的输出。 *: <syntaxhighlight lang="bash" highlight=""> </syntaxhighlight> * '''--state''':此选项提供有用的组级别信息 *: <syntaxhighlight lang="bash" highlight=""> > bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --state COORDINATOR (ID) ASSIGNMENT-STRATEGY STATE #MEMBERS localhost:9092 (0) range Stable 4 </syntaxhighlight> 要手动删除一个或多个消费者组,可以使用“'''--delete'''”: <syntaxhighlight lang="bash" highlight=""> > bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group my-group --group my-other-group Deletion of requested consumer groups ('my-group', 'my-other-group') was successful. </syntaxhighlight> 要重置消费者组的offset,可以使用“'''--reset-offsets'''”选项。此选项同一时间只支持一个消费者组操作。它需要定义以下范围:'''--all-topics''' 或 '''--topic'''。 * 除非您使用 '''--from-file''' 方案,否则必须选择一个范围。另外,首先请确保消费者实例处于非活动状态。 它有3个执行操作命令选项: * (默认)显示要重置的offset * 执行'''--reset-offsets'''处理 * '''--export''':将结果导出为CSV格式。 '''--reset-offsets''' 还具有以下场景可供选择(必须选择至少一个场景): * --reset-offsets :将offset重置为与日期时间的offset。 格式:'YYYY-MM-DDTHH:mm:SS.sss' * --to-earliest : 将offset重置为最早的offset。 * --to-latest : 将offsets重置为最新的offsets。 * --shift-by : 重置offsets,通过移位“n”,其中“ n”可以为正或负。 * --from-file : 将offset重置为CSV文件中定义的值。 * --to-current : 将offset重置为当前的offset。 * --by-duration : 将offset重置为从当前时间戳重置为持续时间offset。格式:“ PnDTnHnMnS” * --to-offset : 将offset重置为指定的。 请注意,超出范围的offset将调整为可用的offset。例如,如果offset最大为10,设置为15时,则实际上将选择offset将为10。 例如,要将消费者组的offset重置为最新的offset: <syntaxhighlight lang="bash" highlight=""> > bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group consumergroup1 --topic topic1 --to-latest TOPIC PARTITION NEW-OFFSET topic1 0 0 </syntaxhighlight> * 如果你使用是老的高级消费者并在zookeeper存储消费者组的元数据(即:offsets.storage=zookeeper),则通过 '''--zookeeper''',而不是 '''bootstrap-server''' *: <syntaxhighlight lang="bash" highlight=""> > bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list </syntaxhighlight> == 扩大集群 == * *: <syntaxhighlight lang="bash" highlight=""> </syntaxhighlight> <syntaxhighlight lang="bash" highlight=""> </syntaxhighlight>
返回至“
Kafka:基本操作
”。
导航菜单
个人工具
登录
命名空间
页面
讨论
大陆简体
已展开
已折叠
查看
阅读
查看源代码
查看历史
更多
已展开
已折叠
搜索
导航
首页
最近更改
随机页面
MediaWiki帮助
笔记
服务器
数据库
后端
前端
工具
《To do list》
日常
阅读
电影
摄影
其他
Software
Windows
WIKIOE
所有分类
所有页面
侧边栏
站点日志
工具
链入页面
相关更改
特殊页面
页面信息