我将使用Kafka connect从MongoDB消费消息并发布到Kafka主题。
默认情况下,Mongo源连接器为每个集合创建一个主题。但是我会有很多集,我希望所有的集只有一个主题。消息将具有集合名称。
- 覆盖mongo-source连接器是更好的方法吗?如果是这样的话,我应该记住什么呢
- 是否已经提供了任何设置?我知道在创建
collection
时将其指定为空将侦听所有集合。但是它为每个集合创建一个主题。
正如@onecricketeer建议的那样,我使用RegexRouter。因此,不需要重写mongo源连接器来将所有集合中的文档发布到同一主题中。
这是我的配置,它监听所有与管道匹配的集合并发布到mongodbTopic
{
"name": "mongo-source",
"config": {
"tasks.max": "1",
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"connection.uri": "<connection_uri>",
"topic.prefix": "",
"pipeline": "[{"$match": { "$or": [{"fullDocument._kind":"collection"},{"fullDocument._isInverse":false}],"ns.coll": {"$regex": /^(.*_related)$|^(my_collection_test)$/}}}]",
"poll.await.time.ms": 5,
"poll.max.batch.size": 2000,
"transforms": "dropPrefix",
"transforms.dropPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropPrefix.regex": ".*",
"transforms.dropPrefix.replacement": "mongodbTopic",
"errors.tolerance": "none",
"copy.existing": true
}
}
查看更多信息https://docs.confluent.io/platform/current/connect/transforms/regexrouter.html