Kafka Connect Api - Getting Started?



我想了解Kafka连接模块的开发生命周期。

以下http://kafka.apache.org/090/documentation.html#connect,我想使用"connectapi"编写一个自定义的Kafka连接模块,但我不知道从哪里开始。有没有关于如何做到这一点的最简单的例子?项目设置等?

顺便说一句,这是我造的https://github.com/confluentinc/kafka-connect-jdbc并试图运行它(在谷歌云上),但我发现了错误——显然是一个缺失的依赖项,但我不知道该添加什么。当然,这可能只是在合流平台上运行。如果它能在其他地方运行,那就太好了。但如果不能,我想知道如何从头开始构建一个,因此我提出了这个问题。

java.lang.NoClassDefFoundError: org/apache/kafka/common/config/ConfigDef$Recommender
    at io.confluent.connect.jdbc.JdbcSourceConnector.start(JdbcSourceConnector.java:66)
    at org.apache.kafka.connect.runtime.Worker.addConnector(Worker.java:186)
    at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.startConnector(StandaloneHerder.java:197)
    at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:145)
    at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:85) 
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.config.ConfigDef$Recommender
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 5 more
这类错误最常见的原因是CLASSPATH的配置。为了让KafkaConnect在运行时找到您的类,您需要将它们包含在类路径中。以下文本直接取自Kafka connect的文档:

安装一个新插件所需要的就是将它放在Kafka Connect进程的CLASSPATH中。所有运行Kafka Connect的脚本都将使用CLASSPATH环境变量,如果在调用时设置了该变量,则可以使用其他连接器插件轻松运行:

以及如何做到:export CLASSPATH=/path/to/my/connectors/* bin/connect-standalone standalone.properties new-custom-connector.properties

我还为Kafka Connect写了一本指南,你可能会觉得它很有用。

kafka-clients.jar更新为kafka-clients-0.10.0.0.jar。旧版本不包括此类:org/apache/kafka/common/config/ConfigDef$Recommender.class

您可以在此处下载:http://central.maven.org/maven2/org/apache/kafka/kafka-clients/0.10.0.0/kafka-clients-0.10.0.0.jar

本可以将其作为注释添加,但SO表示我没有足够的分数。无论如何,这个答案的目的是证明JDBC连接器可以在不安装整个Confluent包和模式注册表的情况下运行。

我能够在不安装Confluent平台(特别是模式注册表)的情况下运行Confluent的JDBC连接器。运行连接器时,类路径中需要四个Confluent库(公共配置、公共度量、公共utils和kafka-connect)。有关详细说明,请参阅https://prefrontaldump.wordpress.com/2016/05/02/using-confluents-jdbc-connector-without-installing-the-entire-platform/

最新更新