Kafka:Connect
关于
Kafka Connect 是一个可扩展、可靠的在Kafka和其他系统之间流传输的数据工具。
它可以通过connectors(连接器)简单、快速的将大集合数据导入和导出kafka。Kafka Connect可以接收整个数据库或收集来自所有的应用程序的消息到Kafka Topic。使这些数据可用于低延迟流处理。导出可以把topic的数据发送到secondary storage(辅助存储也叫二级存储)也可以发送到查询系统或批处理系统做离线分析。
Kafka Connect功能包括:
- Kafka连接器通用框架:Kafka Connect 规范了kafka与其他数据系统集成,简化了connector的开发、部署和管理。
- 分布式和单机模式:扩展到大型支持整个organization的集中管理服务,也可缩小到开发,测试和小规模生产部署。
- REST 接口:使用 REST API 来提交并管理 Kafka Connect 集群。
- 自动的offset管理:从 connector 获取少量的信息,Kafka Connect 来管理 offset 的提交,所以 connector 的开发者不需要担心这个容易出错的部分。
- 分布式和默认扩展:Kafka Connect 建立在现有的组管理协议上。更多的工作可以添加扩展到 Kafka Connect 集群。
- 流/批量集成:利用kafka现有的能力,Kafka Connect 是一个桥接流和批量数据系统的理想解决方案。
用户指南
提供了一个快速入门的例子,运行一个单机版的Kafka Connect。
运行Kafka Connect
Kafka Connect目前支持两种执行模式:独立(单进程)和分布式。
- 在独立模式下,所有的工作都在一个单进程中进行的。这样易于配置,在一些情况下,只有一个在工作是好的(例如,收集日志文件),但它不会从kafka Connection的功能受益,如容错。
- 通过下面的命令开始一个单进程的例子:
> bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties ...]
- 第一个参数是 worker(工人)的配置,这包括 Kafka连接的参数设置,序列化格式,以及频繁地提交offset(偏移量)。
- 其余的参数是connector(连接器)配置文件。你可以配置你需要的,但是所有的执行都在同一个进程(在不同的线程)。
- 分布式的模式会自动平衡。允许你动态的扩展(或缩减),并在执行任务期间和配置、偏移量提交中提供容错保障,非常类似于独立模式:
bin/connect-distributed.sh config/connect-distributed.properties
在不同的类中,配置参数定义了Kafka Connect如何处理,哪里存储配置,如何分配work,哪里存储offset和任务状态。在分布式模式中,Kafka Connect在topic中存储offset,配置和任务状态。建议手动创建offset的topic,可以自己来定义需要的分区数和副本数。如果启动Kafka Connect时还没有创建topic,那么topic将自动创建(使用默认的分区和副本),这可能不是最合适的(因为kafka可不知道业务需要,只能根据默认参数创建)。
特别是以下配置参数尤为关键,启动集群之前设置:
- group.id (默认connect-cluster):Connect cluster group使用唯一的名称;注意这不能和consumer group ID(消费者组)冲突。
- config.storage.topic (默认connect-configs):topic用于存储connector和任务配置;注意,这应该是一个单个的partition,多副本的topic。你需要手动创建这个topic,以确保是单个partition(自动创建的可能会有多个partition)。
- offset.storage.topic (默认 connect-offsets):topic用于存储offsets;这个topic应该配置多个partition和副本。
- status.storage.topic (默认 connect-status):topic 用于存储状态;这个topic 可以有多个partitions和副本
注意,在分布式模式中,connector(连接器)配置不能使用命令行。要使用下面介绍的 REST API 来创建,修改和销毁 connector。
配置连接器(connector)
Connector 的配置是简单的 key-value 映射。
- 对于独立模式,这些都是在属性文件中定义,并通过在命令行上的Connect处理。
3 在分布式模式,JSON负载connector的创建(或修改)请求。
大多数配置都是依赖的connector,有几个常见的选项:
- name:连接器唯一的名称,不能重复。
- connector.calss:连接器的Java类。
- tasks.max:连接器创建任务的最大数。
其中:
- connector.class 配置支持多种格式:全名或连接器类的别名。
- 比如连接器是 org.apache.kafka.connect.file.FileStreamSinkConnector,你可以指定全名,也可以使用 FileStreamSink 或 FileStreamSinkConnector。
- Sink connector也有一个额外的选项来控制它们的输入:
- topics:作为连接器的输入的topic列表。
REST API
由于 Kafka Connect 的目的是作为一个服务运行,提供了一个用于管理 connector 的 REST API。默认情况下,此服务的端口是 8083。
以下是当前支持的终端入口:
- GET /connectors:返回活跃的connector列表
- POST /connectors:创建一个新的connector;请求的主体是一个包含字符串name字段和对象config字段(connector的配置参数)的JSON对象。
- GET /connectors/{name}:获取指定connector的信息
- GET /connectors/{name}/config:获取指定connector的配置参数
- PUT /connectors/{name}/config:更新指定connector的配置参数
- GET /connectors/{name}/status:获取connector的当前状态,包括它是否正在运行,失败,暂停等。
- GET /connectors/{name}/tasks:获取当前正在运行的connector的任务列表。
- GET /connectors/{name}/tasks/{taskid}/status:获取任务的当前状态,包括是否是运行中的,失败的,暂停的等,
- PUT /connectors/{name}/pause:暂停连接器和它的任务,停止消息处理,直到connector恢复。
- PUT /connectors/{name}/resume:恢复暂停的connector(如果connector没有暂停,则什么都不做)
- POST /connectors/{name}/restart:重启connector(connector已故障)
- POST /connectors/{name}/tasks/{taskId}/restart:重启单个任务 (通常这个任务已失败)
- DELETE /connectors/{name}:删除connector, 停止所有的任务并删除其配置
Kafka Connector 还提供了获取有关 connector plugins 信息的 REST API:
- GET /connector-plugins:返回已在 Kafka Connect 集群安装的 connector plugin 列表。
- 请注意,API仅验证处理请求的worker的connector。这意味着你可能看到不一致的结果,特别是在滚动升级的时候(添加新的connector jar)
- PUT /connector-plugins/{connector-type}/config/validate:对提供的配置值进行验证,执行对每个配置验证,返回验证的建议值和错误信息。
开发指南【???】
开发者怎么样编写新的connector,用于kafka和其他系统之间的数据移动。
核心概念和API【???】
在Kafka和其他系统之间复制数据,用户创建自定义的从系统中 pull 数据或 push 数据到系统的 Connector(连接器)。
Connector有两种形式:
- SourceConnectors:从其他系统导入数据(如:JDBCSourceConnector将导入一个关系型数据库到Kafka);
- SinkConnectors:导出数据(如:HDFSSinkConnector将kafka主题的内容导出到HDFS文件)。
connector 不会执行任何复制自己的数据:它们的配置展示了要复制的数据,而Connector是负责打破这一工作变成一组可以分配worker的任务。
这些任务也有两种相对应的形式:
- SourceTask;
- SinkTask。
在手里的任务,每个任务必须复制其子集的数据或Kafka的。
- 在Kafka Connect,这些任务作为一组具有一致性模式的记录(消息)组成的输出和输入流。
有时,这种映射是明显的:在一组日志文件,每个文件可以被视为一个流,每个分析的行形成一个记录,使用相同的模式和offset存储在文件中的字节偏移。
在其他的情况下可能需要更多的努力来映射到该模型:一个JDBC连接器可以将每张表映射到一个流,但offset是不太清楚的。
一种可能的映射使用时间戳列来生成查询递增返回新的数据,上次查询时间戳可被用作offset。
流和记录(Streams and Records)
每个流都应该有一个 key-value 的记录序列。
key 和 value 可以具有复杂的结构:提供了许多原始类型,但数组、对象和嵌套的数据结构也可以。
运行时,数据格式不承担任何特定的序列化格式,这种转换是由框架内部处理的。
除了key和value,记录(由源和传递到sink产生的)关联的流ID和offset。
这些都是使用了框架。定期提交的offset的数据(已处理的),以便在发生故障时,处理可以从最后一个提交的偏移量恢复,避免不必要的重复处理。
动态连接器(Dynamic Connectors)
并非所有的工作都是静态的,Connector(连接器)的实现还负责监控外部系统(根据外部系统的变化可能需要重新配置)。
例如,在 JDBCSourceConnector 的例子中,Connector可分配一组表到每个任务。当创建一个新的表,它必须要发现这个新表,并更新到配置把新的表分配到任务中。当注意到一个变化,需要重新配置(或任务数量的变化),它通知框架更新相应的任务。
开发一个简单的连接器(Connector)
开发一个连接器只需要实现两个接口,Connector 和 Task。
在 Kafka 源代码里 file 包下有一个简单的例子。
- 该 connector 是用于独立模式,SourceConnector/SourceTask 实现文件每行读取,并作为记录(消息)用 SinkConnector/SinkTask 把每个记录写到一个文件。
connector例子:SourceConnector
【我们拿SourceConnector作为一个简单的例子。SinkConnector的实现也非常类似。】
- 通过创建一个继承 SourceConnector 的类开始,增加一个字段存储解析的配置信息(文件名读取和发送数据到topic):
public class FileStreamSourceConnector extends SourceConnector { private String filename; private String topic;
- 最简单的方法是 getTaskClass(),它定义了在工作进程中实例化的实际读取数据的类:
@Override public Class<? extends Task> getTaskClass() { return FileStreamSourceTask.class; }
- 定义 FileStreamSourceTask 类,接下来,我们增加一些标准的生命周期的方法,start()和stop():
@Override public void start(Map<String, String> props) { // The complete version includes error handling as well. filename = props.get(FILE_CONFIG); topic = props.get(TOPIC_CONFIG); } @Override public void stop() { // Nothing to do since no background monitoring is required. }
- 最后,是实现真正核心的 getTaskConfigs()。在这种情况下,我们只处理一个文件,这样即使我们生成更多的任务(根据maxTask参数),我们返回一个列表,只有一个入口:
@Override public List<Map<String, String>> getTaskConfigs(int maxTasks) { ArrayList>Map<String, String>> configs = new ArrayList<>(); // Only one input stream makes sense. Map<String, String> config = new Map<>(); if (filename != null) config.put(FILE_CONFIG, filename); config.put(TOPIC_CONFIG, topic); configs.add(config); return configs; }
虽然在本例中未使用,SourceTask 也提供了两个API来提交源系统的offset:commit 和 commitRecord。提供了有消息确认机制的源系统API。重写这些方法,允许source connector(源连接器)在源系统应答消息。一旦他们写入到kafka,无论消息是成批的还是单独。
- commit API在源系统存储offset,由poll返回offset。这个API的实现是阻塞的,直到提交完成。
- commitRecord API为在源系统中的每个写入到Kafka之后的SourceRecord保存offset,Kafka Connect自动记录offset。
SourceTasks不需要实现。在connector需要确认在源系统acknowledge(应答)消息的情况下,即使有多个任务,这种方法实现通常是非常简单的,只需要一个API。它只确定输入任务的数量,这可能需要它从远程服务提取数据。然后瓜分数据。
由于一些模式之间分配work(工作)非常普遍,有些实用工具提供了ConnectorUtils来简化这些情况,注意,这个简单的例子不包括动态输入。
Task例子:Source Task
- 和连接器一样,我们需要创建一个类(继承基于Task的类)。它也有一些标准的生命周期方法:
public class FileStreamSourceTask extends SourceTask<Object, Object> { String filename; InputStream stream; String topic; public void start(Map<String, String> props) { filename = props.get(FileStreamSourceConnector.FILE_CONFIG); stream = openOrThrowError(filename); topic = props.get(FileStreamSourceConnector.TOPIC_CONFIG); } @Override public synchronized void stop() { stream.close(); }
接下来,我们实现任务的主要功能,poll()方法。它从输入系统获取时间并返回一个List<SourceRecord>:
@Override public List<SourceRecord> poll() throws InterruptedException { try { ArrayList<SourceRecord> records = new ArrayList<>(); while (streamValid(stream) && records.isEmpty()) { LineAndOffset line = readToNextLine(stream); if (line != null) { Map<String, Object> sourcePartition = Collections.singletonMap("filename", filename); Map<String, Object> sourceOffset = Collections.singletonMap("position", streamOffset); records.add(new SourceRecord(sourcePartition, sourceOffset, topic, Schema.STRING_SCHEMA, line)); } else { Thread.sleep(1); } } return records; } catch (IOException e) { // Underlying stream was killed, probably as a result of calling stop. Allow to return // null, and driving thread will handle any shutdown if necessary. } return null; }
我们可以看到重要的步骤:poll()方法反复的调用,并每次调用都会尝试从文件中读取记录(消息)。读取每一行,也跟踪文件的offset。它使用该信息来创建一个输出SourceRecord和四条信息:源分区(只有1个,读取单个文件),源offset(在文件中的字节offset),输出topic的name,和输出value(行,包括一个模式,表示value始终是一个string)。
SourceRecord构造函数的其他实现也包括一个指定的输出分区和key。
- 注意,此实现使用正常的Java InputStream接口,如果数据不可用则可以sleep(休眠)。这个可以接受,因为Kafka Connect为每个任务提供了一个专用的线程。而任务实现必须基于poll()接口,这样有跟多的灵活性(自己实现)。在这种情况下,基于NIO的实现会更有效,但方法简单,快速实现,并兼容老版本(Java)。
Sink Tasks
不像 SourceConnector 和 SinkConnector, SourceTask 和 SinkTask 有很多不同的接口,因为 SourceTask 使用 pull 接口,SinkTask 使用 push 接口。
两者都有共同的生命周期的方法,但是SinkTask接口是完全不同的:
public abstract class SinkTask implements Task {
public void initialize(SinkTaskContext context) {
this.context = context;
}
public abstract void put(Collection<SinkRecord> records);
public abstract void flush(Map<TopicPartition, Long> offsets);
put()方法包含大部分的实现,接收集合SinkRecords,执行转换,并存储到目标系统。这个方法不需要确保返回之前数据完全写入到目标系统。事实上,在大部分情况下,内存缓冲是有用的,这样记录可以按一个整批次一次发送,从而减少插入事件进入downstream(下游)数据存储的开销。
SinkRecord作为SourceRecords包含相同的信息:Kafka topic,partition,offset和事件key和value。flush()方法在offset提交过程期间,它允许任务从故障中恢复,并从安全点恢复(这样就没有事件会被错过)。该方法应该将任何未完成的数据push到目标系统,然后阻塞,直到写入已得到确认。通常offset参数可以忽略,但在某些情况下,想要实现存储offset信息到目标系统以提供正好一次的语义。例如,HDFS connector(连接器)可以做到这一点,使用原子移动操作来确保flush()的原子性,确保提交数据和offset到最终的位置(HDFS)。
从之前的offset恢复(Resuming from Previous Offsets)
SourceTask包含一个流ID(输入的文件名)和每个记录的offset(文件中的位置)。框架使用了定时提交offset,所以在故障的情况下,任务恢复并减少再处理和可能重复的事件数(如果Kafka Connect正常的停止,可从最近的offset恢复,例如在独立模式或重新加载配置)。提交处理是完全自动化的,但只有connector知道如何返回到正确的位置,从该位置恢复。正确的恢复后,任务可以使用SourceContext传递其initialize()方法来访问offset数据。在initialize()中,我们会添加一些代码来读取offset(如果存在),并找到它的位置。
stream = new FileInputStream(filename);
Map<String, Object> offset = context.offsetStorageReader().offset(Collections.singletonMap(FILENAME_FIELD, filename));
if (offset != null) {
Long lastRecordedOffset = (Long) offset.get("position");
if (lastRecordedOffset != null)
seekToOffset(stream, lastRecordedOffset);
}
当然,你可能需要为每个输入流读取大量的key。OffsetStorageReader接口也允许批量读取(有效的负载所有的offset),然后找出每个输入流到合适的位置。
动态的输入/输出流 (Dynamic Input/Output Streams)
Kafka Connect的工作被定义为拷贝大量数据。如拷贝一个完整的数据库,而不是创建多个job来分别复制每一张表。这种设计的后果是,一个connector的输入或输出流集合可以随着时间的推移而变化。Source connector需要监听源系统的改变。例如:数据库表的增加/删除。当发现改变,通过ConnectorContext对象通知框架,来重新加载。
例如,在SourceConnector:
if (inputsChanged())
this.context.requestTaskReconfiguration();
该框架将立即请求新配置并更新任务,在重新加载配置之前优雅的提交自己的进度。注意,SourceConnector检测目前留给connector实现,如果需要一个额外的线程执行此监控。那么connector必须分配它自己。理想的情况下,监控变更代码将会隔离Connector和任务,不需担心。然而,变更也可能影响任务,最常见的是,当其中一个输入流在输入系统销毁了。例如:如果一张表从数据库中删除。如果任务在Connector之前遇到问题,如果Connector需要poll(轮询)变更,则任务将需要处理随后的错误。这些都是常见的问题。值得庆幸的是,这可以通过简单catch和处理相应的异常。SinkConnectors通常只能处理流的增加,它可以转换输出新的entires(例如,一个新的数据库表)。该框架管理对Kafka的输入的任何变更。例如当输入的topic集变化(由一个正则表达式的订阅)。 SinkTasks等待新的输入流,它可能需要在下游系统创建新的资源。比如数据库中的新表,最棘手的情况是在这种情况产生的冲突(多个SinkTask看到一个新的输入流并同时尝试去创建新的资源)。SinkConnectors,另一方面,一般不需要特殊的代码来处理一组动态的流。
连接配置验证 (Connect Configuration Validation)
Kafka Connect允许你在提交要执行的connector之前来验证connector的配置,并可以提供故障和推荐值的反馈。利用这个优势,connector开发者需要提供一个config()的实现来暴露配置给框架。
下面的代码在 FileStreamSourceConnector 定义配置和暴露给框架。
private static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(FILE_CONFIG, Type.STRING, Importance.HIGH, "Source filename.")
.define(TOPIC_CONFIG, Type.STRING, Importance.HIGH, "The topic to publish data to");
public ConfigDef config() {
return CONFIG_DEF;
}
ConfigDef类用于指定预期的配置集,对于每个配置,你可以指定name,type,默认值,描述,group信息,group中的顺序,配置值的宽和适于在UI显示的名称。另外,你可以通过重写Validator类来指定的验证逻辑用于单个配置验证。此外,由于配置之间有可能存在依赖关系。例如,配置中的vaild和visibillty的值可能会根据其他的配置的值而变化。为了解决这个问题,ConfigDef允许你指定一个配置依赖,并提供推荐系统的实现来获取valid的值并设置visibillty得到的当前配置值。此外,Connector的validate()方法提供了一个默认验证实现,返回一个列表(返回允许配置的列表,每个配置的配置错误和推荐值)。然后,它不适用配置验证的推荐值。你可以提供一个自定义的配置验证覆盖的默认实现,这可能会使用建议的值。
Working with Schemas
FileStream connector是很好的例子,因为它很简单,但是也有很普通的结构化数据 — 每行只有一个字符串(string),实际connector都需要更复杂的数据格式模式,要创建更复杂的数据,你需要使用Kafka Connect data API。除了原始类型的影响,大多数记录的结构需要2个类:Schema 和 Struct。API文档有完整参考。
这里是一个简单的例子,创建一个Schema和Struct:
Schema schema = SchemaBuilder.struct().name(NAME)
.field("name", Schema.STRING_SCHEMA)
.field("age", Schema.INT_SCHEMA)
.field("admin", new SchemaBuilder.boolean().defaultValue(false).build())
.build();
Struct struct = new Struct(schema)
.put("name", "Barbara Liskov")
.put("age", 75)
.build();
如果你实现一个source connector,你需要决定何时以及如何创建schema。如果可能的话,你应该尽量避免重复计算。例如,如果你的connector保证有一个固定的schema,用静态和使用单例,然而,大部分connector有动态的schema。一个简单的例子,一个数据库connector。甚至只考虑一张表,这个schema不会预定义整个connector(因为它表到表的变化),但它也不会固定为单表生命周期中,因为用户可能会ALTER TABLE(修改表)。connector必须能够检测这些变化并作出反应,Sink connector之所以简单,是因为它们消费数据,不需要创建shema。但是,它们应该同样的去关心验证它们收到的schema的格式是预期的。当schema不匹配 — 通常表示上游的生产者生产无效的数据不能被正确的转换到目标系统 — sink connectors抛出一个异常给系统。
Kafka Connect管理 (Kafka Connect Administration)
Kafka Connect的REST层提供了一组API来管理集群。这包括查看connector的配置和任务的状态,以及改变其当前的行为(例如改变配置和重新启动任务)。
当一个connector第一次被提交到集群,worker重新平衡集群中全部的connector和它们的任务。使每个worker具有大致相同的工作量。当connector递增和减少任务数,或connector配置发生变化时,也使用了同样的重新平衡程序。你可以使用REST API查看connector当前的状态和任务,包括每个分配worker的id。
例如,查询一个源文件的状态(使用 GET /connectors/file-source/status)可能会产生如下的输入:
{
"name": "file-source",
"connector": {
"state": "RUNNING",
"worker_id": "192.168.1.208:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "192.168.1.209:8083"
}
]
}
Connector和它们的任务发布状态状态更新到共享topic(配置status.storage.topic)集群监控中的所有的worker。因为woker异步消费这个topic,在一个状态改变之前,有一个典型的(短)延迟是可见的(通过状态API)。
下列的状态可能是connector或是其任务之一:
- UNASSIGNED:connector/task 还未分配给worker.
- RUNNING:connector/task 正在运行.
- PAUSED:The connector/task has been administratively paused.
- FAILED:connector/tast故障(通常是抛出一个异常,状态输出报告)。
在多数情况下,connector和任务状态将匹配,尽管他们可能短时间不同(当发生变化或任务故障)。例如,当一个connector刚启动时,connector和其任务转换到运行状态之前,可能有明显的延时。当任务故障(因为Connect不会自动重启故障的任务) 状态还会出现分歧。手动的重启connector/任务时,可以使用上面列出的重启API。注意,如果你尝试去重启任务(这个任务正在rebalance),Connect将会返回一个409(冲突)状态代码。你可以在rebalance完成之后重试,但是没有必要,因为rebalance有效地重新启动集群中的所有connector和任务。
有时可以暂时的停止connector的消息处理。例如,如果远程系统正在维护,最好source connector停止poll,而不是一直报错误的异常日志刷屏。对于这个用例,Connect提供了一个暂停/恢复(pause/resume)的API。虽然source connector暂停了,Connect将停止poll额外的记录。当sink connector被暂停时,Connect将停止向它推送新的消息。暂停状态是持久性的。所以即使你重新启动集群,connector也不会再次启动消费处理,直到任务恢复。注意,connector的任务转换到PAUSED状态时有可能会有延迟。因为它需要在暂停的期间来完成所有处理。另外,失败的任务不会转换到PAUSED状态,直到它们重新启动。