我想知道是否建议创建多个Kafka连接器来流式传输同一数据库或同一MongoDB集群内不同数据库中的多个集合数据。
我认为每个集群只有一个操作日志。因此,读取多个集合的数据很容易,这种方法将减少集群的负载。但我不确定将数据放在每个收藏的不同卡夫卡主题上会有多容易。而在创建多个连接器的第二种方法中。我觉得这会给服务器带来太多负载。
请提出建议的方法。
您可以监听来自多个mongo集合的多个更改流,只需要为pipeline
中的集合名称提供合适的Regex。您甚至可以通过提供Regex来排除一个或多个集合,从那里您不想监听任何更改流。
"pipeline": "[{"$match":{"$and":[{"ns.db":{"$regex":/^database-name$/}},{"ns.coll":{"$regex":/^collection_.*/}}]}}]"
您甚至可以使用$nin
排除任何给定的数据库,因为您不想监听任何更改流。
"pipeline": "[{"$match":{"$and":[{"ns.db":{"$regex":/^database-name$/,"$nin":[/^any_database_name$/]}},{"ns.coll":{"$regex":/^collection_.*/}}]}}]"
回答您的问题:
- 从我的角度来看,为每个集合创建
N
个Kafka连接器将是一项开销,相反,我建议创建一个Kaf卡连接器。确保使用推荐的配置提供容错,只是不要依赖连接器的默认配置
以下是Kafka连接器的基本配置。
Mongo到Kafka源连接器
{
"name": "mongo-to-kafka-connect",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"publish.full.document.only": "true",
"tasks.max": "3",
"key.converter.schemas.enable": "false",
"topic.creation.enable": "true",
"poll.await.time.ms": 1000,
"poll.max.batch.size": 100,
"topic.prefix": "any prefix for topic name",
"output.json.formatter": "com.mongodb.kafka.connect.source.json.formatter.SimplifiedJson",
"connection.uri": "mongodb://<username>:<password>@ip:27017,ip:27017,ip:27017,ip:27017/?authSource=admin&replicaSet=xyz&tls=true",
"value.converter.schemas.enable": "false",
"copy.existing": "true",
"topic.creation.default.replication.factor": 3,
"topic.creation.default.partitions": 3,
"topic.creation.compacted.cleanup.policy": "compact",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"mongo.errors.log.enable": "true",
"heartbeat.interval.ms": 10000,
"pipeline": "[{"$match":{"$and":[{"ns.db":{"$regex":/^database-name$/}},{"ns.coll":{"$regex":/^collection_.*/}}]}}]"
}
}
您可以从官方文档中获得更多详细信息。
- Mongo文档:https://www.mongodb.com/docs/kafka-connector/current/source-connector/
- 汇合文档:https://docs.confluent.io/platform/current/connect/index.html
- Regex:https://www.mongodb.com/docs/manual/reference/operator/query/regex/#mongodb-查询-op-正则表达式
- 配置属性:https://www.mongodb.com/docs/kafka-connector/current/source-connector/configuration-properties/