我正在使用MongoSourceConnector将kafka主题与mongo数据库集合连接起来。对于具有单个kafka主题的单个数据库,它工作得很好,但有没有任何方法可以为具有单个ka夫ka主题的多个mongo数据库进行连接。
如果您在分布式模式下运行kafka-connect,那么您可以使用上述配置创建另一个连接器配置文件
我真的不确定多个数据库和一个Kafka主题,但你肯定可以监听多个数据库的更改流并将数据推送到主题。由于主题创建依赖于database_name.collection_name
,因此您将拥有更多的主题。
您可以提供Regex来侦听pipeline
中的多个数据库。
"pipeline": "[{"$match":{"$and":[{"ns.db":{"$regex":/^database-names_.*/}},{"ns.coll":{"$regex":/^collection_name$/}}]}}]"
这是完整的Kafka连接器配置。
Mongo到Kafka源连接器
{
"name": "mongo-to-kafka-connect",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"publish.full.document.only": "true",
"tasks.max": "3",
"key.converter.schemas.enable": "false",
"topic.creation.enable": "true",
"poll.await.time.ms": 1000,
"poll.max.batch.size": 100,
"topic.prefix": "any prefix for topic name",
"output.json.formatter": "com.mongodb.kafka.connect.source.json.formatter.SimplifiedJson",
"connection.uri": "mongodb://<username>:<password>@ip:27017,ip:27017,ip:27017,ip:27017/?authSource=admin&replicaSet=xyz&tls=true",
"value.converter.schemas.enable": "false",
"copy.existing": "true",
"topic.creation.default.replication.factor": 3,
"topic.creation.default.partitions": 3,
"topic.creation.compacted.cleanup.policy": "compact",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"mongo.errors.log.enable": "true",
"heartbeat.interval.ms": 10000,
"pipeline": "[{"$match":{"$and":[{"ns.db":{"$regex":/^database-names_.*/}},{"ns.coll":{"$regex":/^collection_name$/}}]}}]"
}
}
您可以从官方文档中获得更多详细信息。
- https://www.mongodb.com/docs/kafka-connector/current/source-connector/
- https://docs.confluent.io/platform/current/connect/index.html