我是Kafka的新手,我想看看是否可以使用Kafka将MongoDb数据与另一个系统同步。
我的设置:
- 我正在运行AWS MSK集群,我已经手动使用Kafka客户端创建了一个EC2实例
- 我已经在
/usr/local/share/kafka/plugins
中添加了MongoDB Kafka Connect插件 - 我正在运行Kafka connect,可以看到它加载了插件
./bin/connect-standalone.sh ./config/connect-standalone.properties /usr/local/share/kafka/plugins/mongodb-kafka-connect-mongodb-1.3.0/etc/MongoSourceConnector.properties
[2020-10-17 13:57:22,304] INFO Registered loader: PluginClassLoader{pluginLocation=file:/usr/local/share/kafka/plugins/mongodb-kafka-connect-mongodb-1.3.0/} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:264)
[2020-10-17 13:57:22,305] INFO Added plugin 'com.mongodb.kafka.connect.MongoSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:193)
[2020-10-17 13:57:22,305] INFO Added plugin 'com.mongodb.kafka.connect.MongoSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:193)
- 未包装的插件有这样的结构
Archive: mongodb-kafka-connect-mongodb-1.3.0.zip
creating: /usr/local/share/kafka/plugins/mongodb-kafka-connect-mongodb-1.3.0/
creating: /usr/local/share/kafka/plugins/mongodb-kafka-connect-mongodb-1.3.0/etc/
inflating: /usr/local/share/kafka/plugins/mongodb-kafka-connect-mongodb-1.3.0/etc/MongoSourceConnector.properties
inflating: /usr/local/share/kafka/plugins/mongodb-kafka-connect-mongodb-1.3.0/etc/MongoSinkConnector.properties
creating: /usr/local/share/kafka/plugins/mongodb-kafka-connect-mongodb-1.3.0/doc/
inflating: /usr/local/share/kafka/plugins/mongodb-kafka-connect-mongodb-1.3.0/doc/README.md
inflating: /usr/local/share/kafka/plugins/mongodb-kafka-connect-mongodb-1.3.0/doc/LICENSE.txt
inflating: /usr/local/share/kafka/plugins/mongodb-kafka-connect-mongodb-1.3.0/manifest.json
creating: /usr/local/share/kafka/plugins/mongodb-kafka-connect-mongodb-1.3.0/lib/
inflating: /usr/local/share/kafka/plugins/mongodb-kafka-connect-mongodb-1.3.0/lib/mongo-kafka-1.3.0-all.jar
creating: /usr/local/share/kafka/plugins/mongodb-kafka-connect-mongodb-1.3.0/assets/
inflating: /usr/local/share/kafka/plugins/mongodb-kafka-connect-mongodb-1.3.0/assets/mongodb-leaf.png
inflating: /usr/local/share/kafka/plugins/mongodb-kafka-connect-mongodb-1.3.0/assets/mongodb-logo.png
这个插件来自合流页面,我也尝试过从Maven页面下载它。问题是当我运行Kafka Connect时,它失败了,因为插件缺少Java依赖项。
[2020-10-17 13:57:24,898] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:121)
java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError: org/apache/avro/Schema
at com.mongodb.kafka.connect.source.MongoSourceConfig.createConfigDef(MongoSourceConfig.java:591)
at com.mongodb.kafka.connect.source.MongoSourceConfig.<clinit>(MongoSourceConfig.java:293)
at com.mongodb.kafka.connect.MongoSourceConnector.config(MongoSourceConnector.java:91)
at org.apache.kafka.connect.connector.Connector.validate(Connector.java:129)
at com.mongodb.kafka.connect.MongoSourceConnector.validate(MongoSourceConnector.java:51)
at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:313)
at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:192)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:115)
Caused by: java.lang.NoClassDefFoundError: org/apache/avro/Schema
... 8 more
Caused by: java.lang.ClassNotFoundException: org.apache.avro.Schema
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 8 more
我的印象是,插件应该在jar文件/usr/local/share/kafka/plugins/mongodb-kafka-connect-mongodb-1.3.0/lib/mongo-kafka-1.3.0-all.jar
中查找依赖项,而不是在Java SDK中。
我在这个设置中缺少了什么?
快速查看一下应该会告诉您错误是否正确。。。
jar -tf mongo-kafka-1.3.0-all.jar | grep avro
如果JAR本身没有绑定Avro,那么MSK很可能不会像Confluent Platform那样包含Avro(我认为Mongo主要是为这个目的绑定了他们的连接器(。至少,Avro不是Apache Kafka的依赖,所以这可以解释这个错误。
您需要下载Avro JAR并将其放置在Kafka Connect Classpath(或至少在lib文件夹中(上
我在本地运行时也遇到了同样的问题。我从不再提供uber jar的融合平台下载了jar(mongo-kafka-connect-1.6.0-confluent.jar(。所以我搜索了uber jar,在下面的网站上找到了,我可以下载uber jar(在下载下拉列表中选择所有(,这就解决了问题。
https://search.maven.org/search?q=a:mongo-kafka连接