查看“Kafka:基本操作”的源代码
←
Kafka:基本操作
跳到导航
跳到搜索
因为以下原因,您没有权限编辑本页:
您请求的操作仅限属于该用户组的用户执行:
用户
您可以查看和复制此页面的源代码。
[[category:Kafka]] == 添加、修改、删除 topic == 如果'''第一次发布一个不存在的topic时,它会自动创建'''。也可以手动添加topic。 # 添加、修改 topic: <syntaxhighlight lang="bash" highlight=""> > bin/kafka-topics.sh --zookeeper zk_host:port/chroot --create --topic my_topic_name --partitions 20 --replication-factor 3 --config x=y </syntaxhighlight> #* 副本控制每个消息在服务器中的备份。 #*: 如果有3个副本,那么最多允许有 2 个节点宕掉才能不丢数据,'''集群中推荐设置 2 或 3 个副本''',才不会中断数据消费。 #* 分区数控制 topic 将分片成多少 log。 #*: 关于分区数的影响,首先每个分区必须完整的存储在单个的服务器上。因此,如果你有20个分区的话(读和写的负载),那么完整的数据集将不超过20个服务器(不计算备份)。最后,分区数影响消费者的最大并发。 #* 命令行上添加的配置覆盖了服务器的默认设置,服务器有关于时间长度的数据,应该保留。 # 更改topic的配置和分区: ## kafka版本 < 2.2 ##: <syntaxhighlight lang="bash" highlight=""> > bin/kafka-topics.sh --zookeeper zk_host:port/chroot --create --topic my_topic_name --partitions 20 --replication-factor 3 --config x=y </syntaxhighlight> ## kafka版本 >= 2.2 ##: '''<syntaxhighlight lang="bash" highlight=""> > bin/kafka-topics.sh --bootstrap-server broker_host:port --alter --topic my_topic_name \ --partitions 40 </syntaxhighlight>''' #* 请注意,分区的一种用例是在语义上对数据进行分区,添加分区不会更改现有数据的分区,因此如果消费者依赖该分区,可能会打扰消费者。 #*: 就是说,如果数据是通过“hash(key) % number_of_partition”进行分区划分的,那么该分区可能会因为添加分区而被搅乱,但是Kafka不会尝试以任何方式自动重新分发数据。 # 添加配置: #: <syntaxhighlight lang="bash" highlight=""> > bin/kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics --entity-name my_topic_name --alter --add-config x=y </syntaxhighlight> # 移除配置: #: <syntaxhighlight lang="bash" highlight=""> > bin/kafka-configs.sh --bootstrap-server broker_host:port --entity-type topics --entity-name my_topic_name --alter --delete-config x </syntaxhighlight> # 删除 topic: #: <syntaxhighlight lang="bash" highlight=""> > bin/kafka-topics.sh --bootstrap-server broker_host:port --delete --topic my_topic_name </syntaxhighlight> #* topic 删除选项默认是关闭的,设置服务器配置开启它。 #*: <syntaxhighlight lang="bash" highlight=""> delete.topic.enable=true </syntaxhighlight> * Kafka 目前不支持减少分区数和改变备份数,但是可以通过迁移脚本来实现【???】。 == 优雅地关闭Kafka == <pre> Kafka集群将自动检测任何代理关闭或故障,并为该计算机上的分区选举新的领导者。 无论服务器发生故障还是为了维护或更改配置而故意关闭,都会发生这种情况。 对于后一种情况,Kafka支持一种更优雅的机制,即先停止服务器,然后再终止它。 </pre> 当服务器正常停止时,它将利用两种优化: # 它将所有日志同步到磁盘上,以避免重新启动时需要进行任何日志恢复(即验证日志尾部所有消息的校验和)。 日志恢复需要时间,因此可以加快有意重启的速度。 # 它将在关闭服务器之前将服务器所领导的所有分区迁移到其他副本。 这将使领导层转移更快,并将每个分区不可用的时间减少到几毫秒。 只要服务器停止运行(不是通过强行终止),都会自动同步日志,但受控的领导层迁移需要使用特殊设置: <syntaxhighlight lang="xml" highlight=""> controlled.shutdown.enable=true </syntaxhighlight> * 注意:只有在 broker 上托管的所有分区都有副本的情况下(即复制因子大于 1 且至少有一个副本处于活动状态),受控关闭才会成功。 *: 这通常是您想要的,因为关闭最后一个副本会使该主题分区不可用。 == 平衡leader == 当一个 broker 停止或崩溃时,这个 broker 中所有分区的 leader 将转移给其他副本。 * 这意味着:在默认情况下,当这个'''broker重新启动之后,它的所有分区都将仅作为follower''',不再用于客户端的读写操作。 为了避免这种不平衡,Kafka 提出了“'''首选副本'''”的概念: : 如果分区的副本列表为1、5、9,则节点1将优先作为其他两个副本5和9的 Leader,因为它在副本列表中较早。 您可以让 Kafka群集通过运行以下命令,尝试将已恢复的副本恢复为 Leader : <syntaxhighlight lang="bash" highlight=""> # kafka版本 <= 2.4 > bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot # kafka新版本 > bin/kafka-preferred-replica-election.sh --bootstrap-server broker_host:port </syntaxhighlight> 手动运行很无趣,你可以通过这个配置设置为自动执行: <syntaxhighlight lang="xml" highlight=""> auto.leader.rebalance.enable=true </syntaxhighlight> == 镜像集群之间的数据 == 我们指的是kafka集群之间复制数据“镜像”,为避免在单个集群中的节点之间发生复制混乱的。 kafka附带了kafka集群之间的镜像数据的工具。该工具'''从一个源集群读取和写入到目标集群''',像这样: : [[File:Kafka:镜像集群之间的数据.png|400px]] 常见的用例是镜像在另一个数据中心提供一个副本。 * 你可以运行很多这样的镜像进程来提高吞吐和容错性(如果某个进程挂了,则其他的进程会接管) * 数据从源集群中的topic读取并将其写入到目标集群中相名的topic。事实上,镜像制作不比消费者和生产者连接要好。 * 源和目标集群是完全独立的实体:分区数和offset可以都不相同,就是因为这个原因,镜像集群并不是真的打算作为一个容错机制(消费者位置是不同的),为此,我们推荐使用正常的集群复制。然而,镜像制造将保留和使用分区的消息key,以便每个键基础上保存顺序。 示例,从两个输入集群镜像到一个topic(名为:my-topic): <syntaxhighlight lang="bash" highlight=""> > bin/kafka-run-class.sh kafka.tools.MirrorMaker --consumer.config consumer-1.properties --consumer.config consumer-2.properties --producer.config producer.properties --whitelist my-topic </syntaxhighlight> * 注意,我们用 --whitelist 选项指定topic列表。此选项允许使用java风格的正则表达式。 *: 所以你可以使用 --whitelist 'A|B' ,A和B是镜像名。或者你可以镜像所有topic。也可以使用--whitelist ‘*’镜像所有topic,为了确保引用的正则表达式不会被shell认为是一个文件路径,我们允许使用‘,’ 而不是‘|’指定topic列表。 * 你可以很容易的排除哪些是不需要的,可以用--blacklist来排除,目前--new.consumer不支持。 * 镜像结合配置 auto.create.topics.enable=true,这样副本集群就会自动创建和复制。 == 检查消费者位置 == 有时候需要去查看你的消费者的位置。 我们有一个显示“消费者组中”所有消费者的位置的工具,显示日志其落后多远。如下: : 消费者组名为my-group,消费者topic名为my-topic, <syntaxhighlight lang="bash" highlight=""> > bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect localhost:2181 --group test Group Topic Pid Offset logSize Lag Owner my-group my-topic 0 0 0 0 test_jkreps-mn-1394154511599-60744496-0 my-group my-topic 1 0 0 0 test_jkreps-mn-1394154521217-1a0be913-0 </syntaxhighlight> * 注意:在0.9.0.0,kafka.tools.ConsumerOffsetChecker 已经不支持了。你应该使用 '''kafka.admin.ConsumerGroupCommand''' 或 '''bin/kafka-consumer-groups.sh''' 脚本来管理消费者组,包括用新消费者API创建的消费者。 *: <syntaxhighlight lang="bash" highlight=""> ## 0.9+ bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group test-consumer-group ## 0.10+ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group </syntaxhighlight> === Managing Consumer Groups(管理消费者组) === 用 '''ConsumerGroupCommand''' 工具,我们可以使用 '''list''','''describe''',或 '''delete''' 消费者组(注意,删除只有在分组元数据存储在zookeeper的才可用)。 * 当使用新消费者API(broker协调处理分区和重新平衡),当该组的最后一个提交的偏移到期时,该组被删除。 示例: # 要列出所有主题中的所有用户组: #: <syntaxhighlight lang="bash" highlight=""> > bin/kafka-consumer-groups.sh --bootstrap-server broker1:9092 --list test-consumer-group </syntaxhighlight> # 要像前面的示例中那样使用ConsumerOffsetChecker查看偏移量,可以这样“describe”消费者组: #: <syntaxhighlight lang="bash" highlight=""> > bin/kafka-consumer-groups.sh --bootstrap-server broker1:9092 --describe --group test-consumer-group GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER test-consumer-group test-foo 0 1 3 2 consumer-1_/127.0.0.1 </syntaxhighlight> 还有一切其他的命令可以提供消费组更多详细信息: * '''-members''':此选项提供使用者组中所有活动成员的列表。 *: <syntaxhighlight lang="bash" highlight=""> > bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members CONSUMER-ID HOST CLIENT-ID #PARTITIONS consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1 consumer1 2 consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1 consumer4 1 consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1 consumer2 3 consumer3-ecea43e4-1f01-479f-8349-f9130b75d8ee /127.0.0.1 consumer3 0 </syntaxhighlight> * '''--members --verbose''':除了上述“ --members”选项报告的信息之外,此选项还提供分配给每个成员的分区 *: <syntaxhighlight lang="bash" highlight=""> > bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --members --verbose CONSUMER-ID HOST CLIENT-ID #PARTITIONS ASSIGNMENT consumer1-3fc8d6f1-581a-4472-bdf3-3515b4aee8c1 /127.0.0.1 consumer1 2 topic1(0), topic2(0) consumer4-117fe4d3-c6c1-4178-8ee9-eb4a3954bee0 /127.0.0.1 consumer4 1 topic3(2) consumer2-e76ea8c3-5d30-4299-9005-47eb41f3d3c4 /127.0.0.1 consumer2 3 topic2(1), topic3(0,1) consumer3-ecea43e4-1f01-479f-8349-f9130b75d8ee /127.0.0.1 consumer3 0 - </syntaxhighlight> * '''-offsets''':默认的describe选项,与“--describe”选项相同的输出。 *: <syntaxhighlight lang="bash" highlight=""> </syntaxhighlight> * '''--state''':此选项提供有用的组级别信息 *: <syntaxhighlight lang="bash" highlight=""> > bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group --state COORDINATOR (ID) ASSIGNMENT-STRATEGY STATE #MEMBERS localhost:9092 (0) range Stable 4 </syntaxhighlight> 要手动删除一个或多个消费者组,可以使用“'''--delete'''”: <syntaxhighlight lang="bash" highlight=""> > bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group my-group --group my-other-group Deletion of requested consumer groups ('my-group', 'my-other-group') was successful. </syntaxhighlight> 要重置消费者组的offset,可以使用“'''--reset-offsets'''”选项。此选项同一时间只支持一个消费者组操作。它需要定义以下范围:'''--all-topics''' 或 '''--topic'''。 * 除非您使用 '''--from-file''' 方案,否则必须选择一个范围。另外,首先请确保消费者实例处于非活动状态。 它有3个执行操作命令选项: * (默认)显示要重置的offset * 执行'''--reset-offsets'''处理 * '''--export''':将结果导出为CSV格式。 '''--reset-offsets''' 还具有以下场景可供选择(必须选择至少一个场景): * --reset-offsets :将offset重置为与日期时间的offset。 格式:'YYYY-MM-DDTHH:mm:SS.sss' * --to-earliest : 将offset重置为最早的offset。 * --to-latest : 将offsets重置为最新的offsets。 * --shift-by : 重置offsets,通过移位“n”,其中“ n”可以为正或负。 * --from-file : 将offset重置为CSV文件中定义的值。 * --to-current : 将offset重置为当前的offset。 * --by-duration : 将offset重置为从当前时间戳重置为持续时间offset。格式:“ PnDTnHnMnS” * --to-offset : 将offset重置为指定的。 请注意,超出范围的offset将调整为可用的offset。例如,如果offset最大为10,设置为15时,则实际上将选择offset将为10。 例如,要将消费者组的offset重置为最新的offset: <syntaxhighlight lang="bash" highlight=""> > bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group consumergroup1 --topic topic1 --to-latest TOPIC PARTITION NEW-OFFSET topic1 0 0 </syntaxhighlight> * 如果你使用是老的高级消费者并在zookeeper存储消费者组的元数据(即:offsets.storage=zookeeper),则通过 '''--zookeeper''',而不是 '''bootstrap-server''' *: <syntaxhighlight lang="bash" highlight=""> > bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list </syntaxhighlight> == 扩大集群 == <pre> 增加新服务到kafka集群是很容易的,只要为新服务分配一个独一无二的 Broker ID 并启动即可。但是,新的服务不会自动分配到任何数据,因此除非将分区移动到这些服务器上,否则在创建新主题之前,它们不会执行任何工作。 </pre> 因此,通常在将计算机添加到集群时,您会希望将一些现有'''数据迁移'''到这些计算机。 '''迁移数据的过程是手动启动的,但是执行过程是完全自动化的'''。在kafka后台内部中,kafka将添加新的服务器,并作为正在迁移分区的 follower,来完全复制该分区现有的数据。当新服务器完全复制该分区的内容并加入同步副本,成为现有副本之一后,就将现有的副本分区上的数据删除。 分区重新分配工具可以用于跨 broker 迁移分区,理想的分区分配将确保所有的 broker 数据负载和分区大小。分区分配工具没有自动研究 kafka 集群的数据分布和迁移分区达到负载分布的能力,因此,管理员要弄清楚哪些topic或分区应该迁移。 分区分配工具的3种模式: # '''--generate''':在此模式下,给定主题列表和代理列表,以将指定的topic的所有parition都移动到新的broker。(是'''生成分配规则json文件的''') #* 此选项仅提供了一种方便的方法,可以在给定主题和目标代理列表的情况下生成分区重新分配计划。 # '''--execute''':在此模式下,是执行你用 --generate 生成的分配规则json文件的,(用 --reassignment-json-file 选项),可以是自定义的分配计划,也可以是由管理员或通过 --generate 选项生成的。 # '''--verify''':在此模式下,该工具验证在最后一次 --execute 期间列出的所有分区的重新分配状态。状态可以是成功完成、失败或正在进行 === 自动将数据迁移到新机器 === <pre> 使用分区重新分配工具将从当前的broker集的一些topic移到新添加的broker。同时扩大现有集群,因为这很容易将整个topic移动到新的broker,而不是每次移动一个parition,你要提供新的broker和新broker的目标列表的topic列表(就是刚才的生成的json文件)。然后工具将根据你提供的列表把topic的所有parition均匀地分布在所有的broker,topic的副本保持不变。 </pre> 例如,下面的例子将主题 foo1,foo2 的所有分区移动到新的 broker 5,6。移动结束后,主题 foo1 和 foo2 所有的分区都会只会在 broker 5,6: 【下面所有的json文件,都是要你自己新建的,不是自动创建的,需要你自己把生成的规则复制到你新建的json文件里,然后执行。】 # 执行迁移工具需要接收一个json文件,首先需要你确认topic的迁移计划并创建json文件,如下所示 #: <syntaxhighlight lang="bash" highlight=""> > cat topics-to-move.json {"topics": [{"topic": "foo1"}, {"topic": "foo2"}], "version":1 } </syntaxhighlight> # 一旦json文件准备就绪,就可以使用分区重新分配工具来生成候选分配 #: <syntaxhighlight lang="bash" highlight=""> > bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-move.json --broker-list "5,6" --generate Current partition replica assignment {"version":1, "partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]}, {"topic":"foo1","partition":0,"replicas":[3,4]}, {"topic":"foo2","partition":2,"replicas":[1,2]}, {"topic":"foo2","partition":0,"replicas":[3,4]}, {"topic":"foo1","partition":1,"replicas":[2,3]}, {"topic":"foo2","partition":1,"replicas":[2,3]}] } Proposed partition reassignment configuration {"version":1, "partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]}, {"topic":"foo1","partition":0,"replicas":[5,6]}, {"topic":"foo2","partition":2,"replicas":[5,6]}, {"topic":"foo2","partition":0,"replicas":[5,6]}, {"topic":"foo1","partition":1,"replicas":[5,6]}, {"topic":"foo2","partition":1,"replicas":[5,6]}] } </syntaxhighlight> # 生成从主题 foo1,foo2 迁移所有的分区到 broker 5,6 的候选分配规则。 #* 注意,这个时候,迁移还没有开始,它只是告诉你当前分配和新的分配规则,当前分配规则用来回滚,新的分配规则保存在json文件(例如,我保存在 expand-cluster-reassignment.json这个文件下)然后,用--execute选项来执行它。 #: <syntaxhighlight lang="bash" highlight=""> > bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --execute Current partition replica assignment {"version":1, "partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]}, {"topic":"foo1","partition":0,"replicas":[3,4]}, {"topic":"foo2","partition":2,"replicas":[1,2]}, {"topic":"foo2","partition":0,"replicas":[3,4]}, {"topic":"foo1","partition":1,"replicas":[2,3]}, {"topic":"foo2","partition":1,"replicas":[2,3]}] } Save this to use as the --reassignment-json-file option during rollback Successfully started reassignment of partitions {"version":1, "partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]}, {"topic":"foo1","partition":0,"replicas":[5,6]}, {"topic":"foo2","partition":2,"replicas":[5,6]}, {"topic":"foo2","partition":0,"replicas":[5,6]}, {"topic":"foo1","partition":1,"replicas":[5,6]}, {"topic":"foo2","partition":1,"replicas":[5,6]}] } </syntaxhighlight> # 最后,--verify 选项用来检查parition重新分配的状态: #* 注意, expand-cluster-reassignment.json(与--execute选项使用的相同)和--verify选项一起使用。 #: <syntaxhighlight lang="bash" highlight=""> > bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --verify Status of partition reassignment: Reassignment of partition [foo1,0] completed successfully Reassignment of partition [foo1,1] is in progress Reassignment of partition [foo1,2] is in progress Reassignment of partition [foo2,0] completed successfully Reassignment of partition [foo2,1] completed successfully Reassignment of partition [foo2,2] completed successfully </syntaxhighlight> === 自定义分区分配和迁移 === 分区重新分配工具也可以有选择性将分区副本移动到指定的broker。 * 当用这种方式,假定你已经知道了分区规则,不需要通过工具生成规则,可以跳过 --generate,直接使用 --execute 例如,下面的例子是移动主题foo1的分区0到brokers 5,6 和主题foo2的分区1到broker 2,3。 # 手工写一个自定义的分配计划到json文件中: #: <syntaxhighlight lang="bash" highlight=""> > cat custom-reassignment.json {"version":1,"partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},{"topic":"foo2","partition":1,"replicas":[2,3]}]} </syntaxhighlight> # 然后,--execute 选项执行分配处理: #: <syntaxhighlight lang="bash" highlight=""> > bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --execute Current partition replica assignment {"version":1, "partitions":[{"topic":"foo1","partition":0,"replicas":[1,2]}, {"topic":"foo2","partition":1,"replicas":[3,4]}] } Save this to use as the --reassignment-json-file option during rollback Successfully started reassignment of partitions {"version":1, "partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]}, {"topic":"foo2","partition":1,"replicas":[2,3]}] } </syntaxhighlight> # 最后使用--verify 验证: #: <syntaxhighlight lang="bash" highlight=""> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --verify Status of partition reassignment: Reassignment of partition [foo1,0] completed successfully Reassignment of partition [foo2,1] completed successfully </syntaxhighlight> == 退役brokers == <pre> 分区重新分配工具还不能自动为退役代理生成重新分配计划。因此,管理员必须制定一个重新分配计划,将托管在 broker 上的所有分区的副本转移到其他代理上。 这可能会相对繁琐,因为重新分配需要确保所有副本从退役的broker迁移到另一个没有停运的broker。 为了简化这个过程,我们计划在将来为退役 broker 添加工具支持。 </pre> == 增加副本 == 在现有分区增加副本是很容易的,只要指定自定义的重新分配的json文件脚本,并用 --execute 选项去执行这个脚本。 例如,下面的示例将主题 foo 的分区 0 的复制因子从 1 增加到 3。在增加复制因子之前,该分区的唯一副本存在于代理 5 上。作为增加复制因子的一部分,我们将在代理 6 和 7 上添加更多副本: # 手工写一个自定义的分配的json脚本: #: <syntaxhighlight lang="bash" highlight=""> > cat increase-replication-factor.json {"version":1, "partitions":[{"topic":"foo","partition":0,"replicas":[5,6,7]}]} </syntaxhighlight> # 然后,用--execute选项运行json脚本: #: <syntaxhighlight lang="bash" highlight=""> > bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --execute Current partition replica assignment {"version":1, "partitions":[{"topic":"foo","partition":0,"replicas":[5]}]} Save this to use as the --reassignment-json-file option during rollback Successfully started reassignment of partitions {"version":1, "partitions":[{"topic":"foo","partition":0,"replicas":[5,6,7]}]} </syntaxhighlight> # -- version 选项来验证parition分配的状态。注意,使用同样的 increase-replication-factor.json #: <syntaxhighlight lang="bash" highlight=""> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file increase-replication-factor.json --verify Status of partition reassignment: Reassignment of partition [foo,0] completed successfully </syntaxhighlight> #* 你也可以使用kafka-topic工具验证: #*: <syntaxhighlight lang="bash" highlight=""> > bin/kafka-topics.sh --zookeeper localhost:2181 --topic foo --describe Topic:foo PartitionCount:1 ReplicationFactor:3 Configs: Topic: foo Partition: 0 Leader: 5 Replicas: 5,6,7 Isr: 5,6,7 </syntaxhighlight> == 彻底删除topic【???】 == <pre> kafka0.8.1.1以及之前版本都无法使用类似一条命令就彻底删除topic,以前看过网上一些删除命令不过只是在zookeeper注销信息而已,但是实际的日志内容还是保存在kafka log中。 </pre> 机器环境如下: <syntaxhighlight lang="xml" highlight=""> Kafka目录:/usr/local/kafka_2.10-0.8.1.1 日志保存目录log.dirs:/data1/kafka/log/ 删除的topic名字:zitest2 </syntaxhighlight> # 从zookeerer删除信息: #: <syntaxhighlight lang="bash" highlight=""> /usr/local/kafka_2.10-0.8.1.1/bin/kafka-run-class.shkafka.admin.DeleteTopicCommand --zookeeper 10.12.0.91:2181,10.12.0.92:2181,10.12.0.93:2181/kafka--topic zitest2 </syntaxhighlight> #: 成功后返回信息:deletion succeeded! # JPS查看kill掉QuorumPeerMain和Kafka进程 #: <syntaxhighlight lang="bash" highlight=""> </syntaxhighlight> # 从log.dirs目录删除文件,可以看到多个子目录名字如zitest2-0,zitest2-1…zitest2-n(就是你topic的partition个数) #: <syntaxhighlight lang="bash" highlight=""> rm –fr zitest2-0……zitest2-n </syntaxhighlight> # 修改日志目录的recovery-point-offset-checkpoint和replication-offset-checkpoint文件(要小心删除,否则待会kafka不能正常启动起来) #: replication-offset-checkpoint格式如下: #: <syntaxhighlight lang="bash" highlight=""> 0 4(partition总数) zitest2 0 0 zitest2 3 0 hehe 0 0 hehe 1 0 </syntaxhighlight> #: 修改后如下: #: <syntaxhighlight lang="bash" highlight=""> 0 2(partition总数) hehe 0 0 hehe 1 0 </syntaxhighlight> #: 把含有zitest2行全部去掉,并且把 partition 总数修改为减去 zitest2 的 partition 的剩余数目,同理 recovery-point-offset-checkpoint 也是这样修改。 完成后就可以正常启动zookeeper和kafka。 == 数据迁移期间限制带宽的使用【???】 == <pre> Kafka提供一个broker之间复制传输的流量限制,限制了副本从机器到另一台机器的带宽上限。当重新平衡集群,引导新broker,添加或移除broker时候,这是很有用的。因为它限制了这些密集型的数据操作从而保障了对用户的影响。 </pre> 有2个接口可以实现限制: # 最简单和最安全的是调用 '''kafka-reassign-partitions.sh''' 时加限制。 # 另外 '''kafka-configs.sh''' 也可以直接查看和修改限制值。 例如,当执行重新平衡时,用下面的命令,它在移动分区时,将不会超过 50MB/s: <syntaxhighlight lang="bash" highlight=""> $ bin/kafka-reassign-partitions.sh --zookeeper myhost:2181 --execute --reassignment-json-file bigger-cluster.json —throttle 50000000 </syntaxhighlight> 当你运行这个脚本,你会看到这个限制: <syntaxhighlight lang="bash" highlight=""> The throttle limit was set to 50000000 B/s Successfully started reassignment of partitions. </syntaxhighlight> 如果你想在重新平衡期间修改限制,增加吞吐量,以便完成的更快。你可以重新运行 '''--execute''' 命令,用相同的reassignment-json-file: <syntaxhighlight lang="bash" highlight=""> $ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --execute --reassignment-json-file bigger-cluster.json --throttle 700000000 There is an existing assignment running. The throttle limit was set to 700000000 B/s </syntaxhighlight> 一旦重新平衡完成,可以使用 '''--verify''' 操作验证重新平衡的状态。如果重新平衡已经完成,限制也会通过--verify命令移除。 * 这点很重要,因为一旦重新平衡完成,并通过--veriry操作及时移除限制。否则可能会导致定期复制操作的流量也受到限制。 : 当--verify执行,并且重新分配已完成时,此脚本将确认限制被移除: <syntaxhighlight lang="bash" highlight=""> $ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --verify --reassignment-json-file bigger-cluster.json Status of partition reassignment: Reassignment of partition [my-topic,1] completed successfully Reassignment of partition [mytopic,0] completed successfully Throttle was removed. </syntaxhighlight> 管理员还可以使用 kafka-configs.sh 验证已分配的配置。有 2 对限制配置用于管理限流。而限制值本身,是个 broker 级别的配置,用于动态属性配置: <syntaxhighlight lang="bash" highlight=""> leader.replication.throttled.rate follower.replication.throttled.rate </syntaxhighlight> 此外,还有枚举集合的限流副本: <syntaxhighlight lang="bash" highlight=""> leader.replication.throttled.replicas follower.replication.throttled.replicas </syntaxhighlight> 其中每个topic配置,所有4个配置值通过kafka-reassign-partitions.sh(下面讨论)自动分配。 查看限流配置: <syntaxhighlight lang="bash" highlight=""> $ bin/kafka-configs.sh --describe --zookeeper localhost:2181 --entity-type brokers Configs for brokers '2' are leader.replication.throttled.rate=700000000,follower.replication.throttled.rate=700000000 Configs for brokers '1' are leader.replication.throttled.rate=700000000,follower.replication.throttled.rate=700000000 </syntaxhighlight> 这显示了应用于复制协议的leader和follower的限制。默认情况下,2个都分配了相同的限制值。 要查看限流副本的列表: <syntaxhighlight lang="bash" highlight=""> $ bin/kafka-configs.sh --describe --zookeeper localhost:2181 --entity-type topics Configs for topic 'my-topic' are leader.replication.throttled.replicas=1:102,0:101, follower.replication.throttled.replicas=1:101,0:102 </syntaxhighlight> 这里我们看到 leader 限制被应用到 broker 102 上的分区 1 和 broker 101 的分区 0。同样,follower 限制应用到 broker 101 的分区 1 和 broker 102 的分区 0。 默认情况下,kafka-reassign-partitions.sh 会将 leader 限制应用于重新平衡前存在的所有副本,任何一个副本都可能是 leader。它将应用 follower 限制到所有移动目的地。因此,如果 broker 101,102 上有一个副本分区,被分配给 102,103,则该分区的 leader 限制,将被应用到 101,102,并且 follower 限制将仅被应用于103。 如果需要,你还可以使用 kafka-configs.sh 的 '''--alter''' 开关手动地更改限制配置。 === 安全的使用限制复制 === 在使用限制复制时应特别的小心,特别是: # 限制移除: #: 一旦重新分配完成,限制应该及时的移除(通过运行kafka-reassign-partitions —verify移除)。 # 确保进展: #: 如果限制设置的太低,与传入的写入速率相比,复制可能无法进行: #: <syntaxhighlight lang="bash" highlight=""> max(BytesInPerSec) > throttle </syntaxhighlight> #: 其中BytesInPerSec是监控生产者写入到broker的吞吐量。 可以使用该命令监视重新平衡期间复制是否在进行,使用以下方式: <syntaxhighlight lang="bash" highlight=""> kafka.server:type=FetcherLagMetrics,name=ConsumerLag,clientId=([-.\w]+),topic=([-.\w]+),partition=([0-9]+) </syntaxhighlight> 在复制期间落后应不断地减少,如果没有缩小,则管理员通过上面介绍的方式增加限制的吞吐量。 === 设置配额 === 默认情况下,客户端的配额不受限制。可以为每个(user,client-id),user或client-id分组设置自定义的配额。 配置自定义的配额(user=user1,client-id=clientA): <syntaxhighlight lang="bash" highlight=""> > bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048' --entity-type users --entity-name user1 --entity-type clients --entity-name clientA Updated config for entity: user-principal 'user1', client-id 'clientA'. </syntaxhighlight> 为 user=user1 配置自定义的配额: <syntaxhighlight lang="bash" highlight=""> > bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048' --entity-type users --entity-name user1 Updated config for entity: user-principal 'user1'. </syntaxhighlight> 为 client-id=clientA 配置自定义的配额: <syntaxhighlight lang="bash" highlight=""> > bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048' --entity-type clients --entity-name clientA Updated config for entity: client-id 'clientA'. </syntaxhighlight> * 可以通过'''--entity-default'''为(user,client-id),user或client-id group设置默认的配额。 为user=userA配置默认client-id配额: <syntaxhighlight lang="bash" highlight=""> > bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048' --entity-type users --entity-name user1 --entity-type clients --entity-default Updated config for entity: user-principal 'user1', default client-id. </syntaxhighlight> 为user配置默认配额: <syntaxhighlight lang="bash" highlight=""> > bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048' --entity-type users --entity-default Updated config for entity: default user-principal. </syntaxhighlight> 为client-id配置默认配额: <syntaxhighlight lang="bash" highlight=""> > bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048' --entity-type clients --entity-default Updated config for entity: default client-id. </syntaxhighlight> 为指定的(user,client-id)展示配额: <syntaxhighlight lang="bash" highlight=""> > bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name user1 --entity-type clients --entity-name clientA Configs for user-principal 'user1', client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048 </syntaxhighlight> 为指定的user展示配额: <syntaxhighlight lang="bash" highlight=""> > bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-name user1 Configs for user-principal 'user1' are producer_byte_rate=1024,consumer_byte_rate=2048 </syntaxhighlight> 为指定的client-id展示配额: <syntaxhighlight lang="bash" highlight=""> > bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type clients --entity-name clientA Configs for client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048 </syntaxhighlight> 如果没有指定名称,则展示指定的类型的,查看所有user,或(user,client): <syntaxhighlight lang="bash" highlight=""> > bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users Configs for user-principal 'user1' are producer_byte_rate=1024,consumer_byte_rate=2048 Configs for default user-principal are producer_byte_rate=1024,consumer_byte_rate=2048 </syntaxhighlight> <syntaxhighlight lang="bash" highlight=""> > bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users --entity-type clients Configs for user-principal 'user1', default client-id are producer_byte_rate=1024,consumer_byte_rate=2048 Configs for user-principal 'user1', client-id 'clientA' are producer_byte_rate=1024,consumer_byte_rate=2048 </syntaxhighlight> 可以通过在 broker 中设置以下配置来设置适用于所有 client-id 的默认配额。 * 仅当未在 Zookeeper 中配置配额覆盖或默认值时,才应用这些属性。 * 默认情况下,每个 client-id 接收一个无限制的配额。 以下将每个producer和consumer client-id的默认配额设置为10MB /秒。 <syntaxhighlight lang="bash" highlight=""> quota.producer.default=10485760 quota.consumer.default=10485760 </syntaxhighlight> * 请注意,这些属性已被弃用,可能会在将来的版本中删除。 请优先使用kafka-configs.sh。 == 从老版本升级kafka == === 从0.8.x, 0.9.x 或 0.10.0.X 升级到 0.10.1.0 === 0.10.1.0有线协议更改,通过遵循以下建议的滚动升级,在升级期间不会停机。但是,需要注意升0.10.1.0中潜在的突发状况。 注意: * 由于引入了新的协议,要在升级客户端之前先升级kafka集群(即,0.10.1.x仅支持 0.10.1.x或更高版本的broker,但是0.10.1.x的broker向下支持旧版本的客户端) 滚动升级: # 更新所有 broker 的 '''server.properties''' 文件,并添加以下属性: #* inter.broker.protocol.version=CURRENT_KAFKA_VERSION (如:0.8.2.0, 0.9.0.0 或 0.10.0.0). #* log.message.format.version=CURRENT_KAFKA_VERSION (有关此配置的详细信息,请查看升级后潜在的性能影响。) # 每次升级一个broker:关闭broker,替换新版本,然后重新启动它。 # 一旦整个群集升级,通过编辑 '''inter.broker.protocol.version''' 并将其设置为 0.10.1.0 来转换所有协议。 # 如果之前的消息格式是 0.10.0,则将 '''log.message.format.version''' 更改为 0.10.1(这无影响,因为 0.10.0 和 0.10.1 的消息格式是相同的)。 #* 如果之前的消息格式版本低于 0.10.0,还不能更改 log.message.format.version;一旦所有的消费者都已升级到 0.10.0.0 或更高版本时,才能更改此参数。 # 逐个重新启动broker,使新协议版本生效。 # 如果 '''log.message.format.version''' 低于 0.10.0,请等待,知道所有消费者升级到0.10.0或更新的版本,然后将每个broker的 log.message.format.version 更改为 0.10.1。然后逐个重启。 注意: * 如果你可接受停机,你可以简单地将所有broker关闭,更新版本并重启启动,它们将默认从新版本开始。 * 变换协议版本和重启启动可以在 broker 升级完成后的任何时间去做,不必马上做。 ==== 在0.10.1.0中潜在的变化 ==== * 新的java消费者不再是测试阶段了,我们建议将其应用到所有的新开发当中。旧的Scala使用仍然支持,但将在下一个版本中弃用,并在未来的主要版本中移除。 * '''--new-consumer''' / '''--new.consumer''' 转换不再需要使用 MirrorMaker 和类似于 Console 消费者工具。只需要通过一个 Kafka broker 连接,而不是 ZooKeeper 了。另外,控制台消费者和旧消费者已弃用,并且将在未来的主要版本中移除。 * Kafka集群现在可通过集群 ID 来标识唯一,broker 升级到 0.10.1.0 时将自动的生成。集群 ID 可通过 '''kafka.server:type=KafkaServer,name=ClusterId''' 获取。它是元数据相应的一部分,序列化,客户端拦截器和度量记录器可通过实现 '''ClusterResourceListener''' 接口来接收集群ID。 * BrokerState "RunningAsController" (value 4) 已被移除。由于一个bug,broker 仅在转换出来之前处于这种状态,因此移除影响应该是最小的。 #* 推荐的方法是通过 kafka 检查给定的 broker 是否是控制器:controller:type=KafkaController,name=ActiveControllerCount * 新的Java消费者现在允许用户通过分区上的时间戳来搜索offset。 * 新的Java消费者现在支持后台线程心跳检测,有一个新的配置 '''max.poll.interval.ms''' 控制消费者主动离开组之前poll调用之间的最大时间(默认是'''5 分钟''')。配置 '''request.timeout.ms''' 的值必须始终大于 max.poll.interval.ms,因为 JoinGroup 请求在消费者重新平衡时候阻塞服务器的最大时间。因此我们更改了其默认值超过5分钟,最后,'''session.timeout.ms''' 的默认值已调整为10秒,并 '''max.poll.records''' 的默认值更改为 500。 * 当使用 Authorizer 并且用户对topic没有描述授权时,broker 将不再向请求返回 '''TOPIC_AUTHORIZATION_FAILED''' 错误,因为这会泄漏topic名称。 相反,将返回 '''UNKNOWN_TOPIC_OR_PARTITION''' 错误代码。 当使用生产者和消费者时,这可能导致意外的超时或延迟,因为Kafka客户端通常将在未知的topic错误时自动重试。 如果您怀疑这可能已经正在发生,你应该查阅客户端日志。 * 获取响应的默认的限制大小(消费者为50MB,副本为10MB)。现有的分区限制也适用(消费者和副本是1MB)。注意,这些限制不是绝对的最大值(下一节解释)。 * 如果一个消息大于响应/分区大小限制,消费者和副本可以继续使用。更具体的是,如果在第一个非空分区中的第一个消息大于限制,则消息将仍然返回。 * '''kafka.api.FetchRequest''' 和 '''kafka.javaapi.FetchRequest''' 中增加了重载的构造函数。以允许调用者去指定分区的顺序(因为在v3中顺序很重要)。之前的构造函数已弃用。在请求发送之前,以避免资源匮乏问题引起的混洗。 ==== 新协议版本 ==== * ListOffsetRequest v1支持基于时间戳的精确offset搜索。 * MetadataResponse v2引入了一个新字段:“cluster_id”。 * FetchRequest v3支持限制响应大小(除了现有的分区限制)。 * JoinGroup v1引入了一个新字段:“rebalance_timeout”。 === 从0.8.x 或 0.9.x 升级到 0.10.0.0 === 0.10.0.0具有潜在的突变更改(请在升级之前查看),以及升级后可能的性能影响。 通过遵循以下建议的滚动升级计划,可保障在升级期间和之后不会出现停机时间和性能影响。 注意:由于引入了新协议,因此在升级客户端之前先升级Kafka集群。 注意: * 对于版本 0.9.0.0:由于 0.9.0.0 中有一个bug,依赖于Zookeeper(旧的Scala高级消费者和MirrorMaker如果一起使用)的客户端将无法在 0.10.0.x 中使用。因此,broker升级到 0.10.0.x 之前,先升级 0.9.0.0 客户端到 0.9.0.1。 ** 对于 0.8.X 或 0.9.0.1 客户端,此步骤不是必需的。 滚动升级: # 更新所有 broker 的 '''server.properties''' 文件,并添加以下配置: #* inter.broker.protocol.version=CURRENT_KAFKA_VERSION (例如:0.8.2 或 0.9.0.0). #*log.message.format.version=CURRENT_KAFKA_VERSION (有关此配置的详细信息,请查看升级后潜在的性能影响。) # 升级broker,关闭它,然后升级到新版本,最后重启它。 # 一旦整个集群升级完成,通过编辑 inter.broker.protocol.version 设置为0.10.0.0转换所有协议。注意:你现在应该还不需要设置 message.format.version,此配置应该当所有的消费者升级为0.10.0.0时才需要设置。 # 依次重新启动broker,使新协议版本生效。 # 一旦所有的消费者已经升级为.10.0,设置每个 broker 的 log.message.format.version 为0.10.0,然后逐个重启。 注意: * 如果你接受停机目,你可以简单粗暴的关闭所有broker,更新版本并重新启动。它们默认从新协议开始。 * 变换协议版本和重启启动可以在broker升级完成后的任何时间去做,不必马上做。 ==== 升级到0.10.0.0后潜在的性能影响 ==== 0.10.0 中的消息格式包括新的时间戳字段,并使用压缩消息的相关联的 offset。磁盘默认的消息格式是 0.10.0,消息格式可以通过 server.properties 中的 log.message.format.version 配置。如果消费者客户端版本低于 0.10.0.0。它只能“理解” 0.10.0 之前的消息格式。在这种情况下,broker 在发送响应到旧版本消费者之前转换 0.10.0 格式到之前的格式。然而,这样的话,broker 不是零复制传输。在 Kafka 社区关于性能影响的报告显示,在升级后,CPU 利用率从 20% 提高 100%。这迫使所有客户端马升级,促使性能恢复正常。为了避免消费者升级到 0.10.0.0 之前的消息转换,可以设置 log.message.format.version 为 0.8.2 或 0.9.0。这样,broker 仍然零复制传输将数据发送给旧的消费者。一旦消费者升级,就可以把消息格式更为 0.10.0,就可以享受含新时间戳和优化后的压缩新消息格式。转换只是为了确保兼容性,尽可能避免消息转换才是至关重要的。 * 客户端升级到 0.10.0.0,不会对性能产生影响。 注意: * 通过设置消息格式版本,可以证明所有现有消息处于或低于该消息格式版本。否则消费者在0.10.0.0之前可能会中断。特别是,在消息格式设置为0.10.0之后,不应将其更改回较早的格式,因为它可能会在0.10.0.0之前的版本上中断消费者。 * 由于在每个消息中引入了额外的时间戳,生产者在发送少量消息可能会看到消息吞吐量下降(因为增加了开销)。 同样,复制每个消息传输也增加了8个字节。 如果你集群的能力与网络接近,可能会超过网卡,并看到由于过载的故障和性能问题。 * 如果生产者已经启用了压缩,则在某些情况下,可能注意到生产者吞吐量减少或broker的压缩率降低。当接收压缩消息时,0.10.0的broker避免再次压缩消息,这样减少延迟并提高吞吐量。然而,在某些情况下,这可能减少生产者的批次大小,导致较差的吞吐量。如果出现这种情况,可调整生产者的linger.ms 和 batch.size以提高吞吐量。另外,生产者用于压缩消息的缓存小于broker生产者使用的缓存,这可能对磁盘上的消息的压缩比有负面影响。 我们打算在未来的Kafka版本中进行配置。 ==== 0.10.0.0潜在的变化 ==== * 从Kafka 0.10.0.0 开始,Kafka中的消息格式版本表示为Kafka的版本。例如,消息格式 0.9.0 指的是支持的最高消息版本就是 0.9.0。 * 消息格式0.10.0已经介绍过了,并且默认是使用的。消息包含了一个时间戳字段和压缩后消息的关系offset。 * 已经引入了 '''ProduceRequest'''/Response v2,并默认使用支持消息格式0.10.0。 * 已经引入了 '''FetchRequest'''/Response v2已经被引入,它默认使用支持消息格式0.10.0。 * '''MessageFormatter''' 接口从def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) 更改为 def writeTo(consumerRecord: ConsumerRecord[Array[Byte], Array[Byte]], output: PrintStream) * '''MessageReader''' 接口从 def readMessage(): KeyedMessage[Array[Byte], Array[Byte]] 更改为 def readMessage(): ProducerRecord[Array[Byte], Array[Byte]] * '''MessageFormatter''' 的包从 kafka.tools 到 kafka.common * MessageReader 的包从 kafka.tools 到 kafka.common * MirrorMakerMessageHandler不再处理(记录:MessageAndMetadata [Array [Byte],Array [Byte]])方法从未被调用用。 * 0.7 版本的 KafkaMigrationTool 不再和 kafka 一起打包。如果你需要从0.7迁移到0.10.0,请先迁移到0.8,然后按照的升级步骤从0.8升级到0.10.0。 * 新消费者API已标准化,接收 '''java.util.Collection''' 作为方法参数的序列化类型。升级现有的版本才能使用0.10.0客户端库 * LZ4压缩消息处理已更改为使用可互操作的规范框架(LZ4f v1.5.1)。为了保留与旧客户端的兼容性,此改变仅适用于消息格式为0.10.0和更高版本。使用v0/v1(消息格式0.9.0)Produce/Fetch LZ4压缩消息的客户端应继续使用0.9.0实现框架。使用Produce/Fetch协议v2或更高版本的客户端应使用可互操作的LZ4f框架。可互操作的LZ4库的列表可在https://www.lz4.org/查看 ==== 在0.10.0.0的显著变化 ==== * 从0.10.0.0开始,增加一个新的客户端 Kafka '''Streams''' 客户端,用于流式处理存储在kafka topic的数据。这个新客户端仅支持0.10.x或更高的版本。 * 新消费者默认 '''receive.buffer.bytes''' 是 '''64 K'''。 * 新的消费者现在公开了 '''exclude.internal.topics''' 配置,以防止内部topic(例如消费者offset topic)被其他的正则匹配订阅。默认是启用。 * 旧的的Scala的生产者已经弃用。使用者尽快使用最新的Java客户端。 新的消费者API已标记为稳定。 === 从0.8.0, 0.8.1.X或0.8.2.X升级到0.9.0.0 === 9.0.0有潜在的中断更改风险(在升级之前需要知道),并且与之前版本的broker之间的协议改变。这意味着此次升级可能和客户端旧版本不兼容。因此在升级客户端之前,先升级kafka集群。如果你使用MirrorMaker下游集群,则同样应首先升级。 滚动升级: # 升级所有 broker 的 server.properties,并在其中添加 inter.broker.protocol.version = 0.8.2.X # 每次升级一个 broker:关闭 broker,替换新版本,然后重新启动。 # 一旦整个群集升级,通过编辑 inter.broker.protocol.version 并将其设置为 0.9.0.0 来转换所有协议。 # 逐个重新启动 broker,使新协议版本生效。 注意: * 如果你可接受停机,你可以简单地将所有broker关闭,更新版本并重启启动,协议将默认从新版本开始。 * 变换协议版本和重启启动可以在broker升级完成后的任何时间去做,不必马上做。 ==== 0.9.0.0潜在的中断变化 ==== * Java 1.6不再支持。 * Scala 2.9不再支持。 * 默认情况下,1000以上的 Broker ID 为自动分配。如果你的集群高于该阈值,需相应地增加 '''reserved.broker.max.id''' 配置。 * replica.lag.max.messages 配置已经移除。分区leader在决定哪些副本处于同步时将不再考虑落后的消息的数。 * 配置参数 replica.lag.time.max.ms 现在不仅指自上次从副本获取请求后经过的时间,还指自副本上次被捕获以来的时间。 副本仍然从leader获取消息,但超过 replica.lag.time.max.ms 配置的最新消息将被认为不同步的。 * 压缩的topic不再接受没有key的消息,如果出现,生产者将抛出异常。 在0.8.x中,没有key的消息将导致日志压缩线程退出(并停止所有压缩的topic)。 * MirrorMaker不再支持多个目标集群。 它只接受一个--consumer.config。 要镜像多个源集群,每个源集群至少需要一个MirrorMaker实例,每个源集群都有自己的消费者配置。 * 在 org.apache.kafka.clients.tools。包下的Tools已移至org.apache.kafka.tools。所有包含的脚本仍将照常工作,只有直接导入这些类的自定义代码将受到影响。 * 在kafka-run-class.sh中更改了默认的Kafka JVM性能选项(KAFKA_JVM_PERFORMANCE_OPTS)。 * kafka-topics.sh脚本(kafka.admin.TopicCommand)现在退出,失败时出现非零退出代码。 * kafka-topics.sh脚本(kafka.admin.TopicCommand)现在将在topic名称由于使用“.”或“_”而导致风险度量标准冲突时打印警告。以及冲突的情况下的错误。 * kafka-console-producer.sh脚本(kafka.tools.ConsoleProducer)将默认使用新的Java Producer,用户必须指定“old-producer”才能使用旧生产者。 * 默认情况下,所有命令行工具都会将所有日志消息打印到stderr而不是stdout。 ==== 0.9.0.1中的显著变化 ==== * 可以通过将 broker.id.generation.enable 设置为 false 来禁用新的 broker ID 生成功能。 * 默认情况下,配置参数 log.cleaner.enable 为true。 这意味着topic会清理。 * policy = compact 现在将被默认压缩,并且 128 MB 的堆(通过log.cleaner.dedupe.buffer.size)分配给清洗进程。你可能需要根据你对压缩topic的使用情况,查看log.cleaner.dedupe.buffer.size和其他log.cleaner配置值。 * 默认情况下,新消费者的配置参数 fetch.min.bytes 的默认值为1。 ==== 0.9.0.0弃用的 ==== * kafka-topics.sh 脚本的变更topic配置已弃用(kafka.admin.ConfigCommand),以后将使用kafka-configs.sh(kafka.admin.ConfigCommand) 。 * kafka-consumer-offset-checker.sh(kafka.tools.ConsumerOffsetChecker)已弃用,以后将使用kafka-consumer-groups.sh (kafka.admin.ConsumerGroupCommand) * kafka.tools.ProducerPerformance已弃用。以后将使用org.apache.kafka.tools.ProducerPerformance(kafka-producer-perf-test.sh也将使用新类) * 生产者的block.on.buffer.full已弃用,并将在以后的版本中移除。目前其默认已经更为false。KafkaProducer将不再抛出BufferExhaustedException,而是使用max.block.ms来中止,之后将抛出TimeoutException。如果block.on.buffer.full属性明确地设置为true,它将设置max.block.ms为Long.MAX_VALUE和metadata.fetch.timeout.ms将不执行。 === 从0.8.1升级到0.8.2 === 0.8.2与0.8.1完全兼容。 关闭,更新代码并重新启动,逐个升级broker。 === 从0.8.0升级到0.8.1 === 0.8.1与0.8完全兼容。 关闭,更新代码并重新启动,逐个升级broker。 === 从0.7升级 === 版本0.7与较新版本不兼容。 对API,ZooKeeper数据结构,协议和配置进行了主要更改,以便添加复制(在0.7中缺失)。 从0.7版升级到更高版本需要一个特殊的迁移工具(通过下一章的API)。 此迁移可以在不停机的情况下完成。
返回至“
Kafka:基本操作
”。
导航菜单
个人工具
登录
命名空间
页面
讨论
大陆简体
已展开
已折叠
查看
阅读
查看源代码
查看历史
更多
已展开
已折叠
搜索
导航
首页
最近更改
随机页面
MediaWiki帮助
笔记
服务器
数据库
后端
前端
工具
《To do list》
日常
阅读
电影
摄影
其他
Software
Windows
WIKIOE
所有分类
所有页面
侧边栏
站点日志
工具
链入页面
相关更改
特殊页面
页面信息