我正在Windows机器上配置Kafka Mongodb接收器连接器。
我的connect-standalone.properties文件有
plugin.path=E:/Tools/kafka_2.12-2.4.0/plugins
我的MongoSinkConnector.properties文件有
name=mongo-sink
topics=first_topic
connector.class=com.mongodb.kafka.connect.MongoSinkConnector
tasks.max=1
key.ignore=true
# Specific global MongoDB Sink Connector configuration
connection.uri=mongodb://localhost:27017,mongo1:27017,mongo2:27017,mongo3:27017
database=test_kafka
collection=transactions
max.num.retries=3
retries.defer.timeout=5000
type.name=kafka-connect
在E:/Tools/kafka_2.12-2.4.0/plugins文件夹中,我有mongo-kafka-connect-1.0.1.jar文件。
命令
binwindowsconnect-standalone configconnect-standalone.properties configMongoSinkConnector.properties
我得到的错误是
[2020-03-23 04:04:12,376] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone)
java.lang.NoClassDefFoundError: com/mongodb/ConnectionString
at com.mongodb.kafka.connect.sink.MongoSinkConfig.createConfigDef(MongoSinkConfig.java:140)
at com.mongodb.kafka.connect.sink.MongoSinkConfig.<clinit>(MongoSinkConfig.java:78)
at com.mongodb.kafka.connect.MongoSinkConnector.config(MongoSinkConnector.java:62)
at org.apache.kafka.connect.connector.Connector.validate(Connector.java:129)
at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:313)
at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:194)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:115)
Caused by: java.lang.ClassNotFoundException: com.mongodb.ConnectionString
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:588)
at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
我应该把其他哪些jar文件放在插件文件夹中,并且/或者我必须进行配置更改?
更新1我已经将mongoodb-driver-core-4.0.1和bson-4.0.1 jar文件也放在插件文件夹中,但出现了相同的错误。
最后,我可以让mongo-kafka连接器在Windows上工作。
以下是对我有效的方法:Kafka安装文件夹为E:\Tools\Kafka_2.12-2.4.0
E: \Tools\kafka_2.12-2.4.0\plugins有mongo-kafka-1.0.1-all.jar文件。
我从下载了这个https://www.confluent.io/hub/mongodb/kafka-connect-mongodb单击左侧的蓝色下载按钮以获得mongodb-kafka-connect-mongodb-1.0.1.zip文件。
zip文件中的etc文件夹中还有文件MongoSinkConnector.properties。将其移动到kafka_installation_folder\plugins
我的connect standalone.propertie文件包含以下条目:
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=E:/Tools/kafka_2.12-2.4.0/plugins/mongo-kafka-1.0.1-all.jar
我的MongoSinkConnector.properties文件有以下条目
name=mongo-sink
topics=topic1,topic2
connector.class=com.mongodb.kafka.connect.MongoSinkConnector
tasks.max=1
connection.uri=mongodb://localhost:27017,localhost:27017,localhost:27017
database=test_kafka
collection=transactions
max.num.retries=3
retries.defer.timeout=5000
field.renamer.mapping=[]
field.renamer.regex=[]
max.batch.size = 0
rate.limiting.timeout=0
rate.limiting.every.n=0
如何运行
在三个控制台中启动mongodb、zookeeper和kafka服务器。
在第4个控制台中,启动Kafka连接-
binwindowsconnect-standalone configconnect-standalone.properties configMongoSinkConnector.properties
在第五个控制台中,向主题发送消息(我为主题1做了(
binwindowskafka-console-producer --broker-list localhost:9092 --topic topic1
>{"你好":1}
>{"Mongo":2}
>{"世界":3}
打开一个mongo客户端并检查您的数据库/集合。您将看到这三条消息。