Mongo Kafka连接器集合监听限制



我们在Mongo中有几个基于n个租户的集合,并且希望kafka连接器只监视特定的集合。

下面是我的蒙古资源。属性文件,在其中添加了管道过滤器以仅侦听特定集合。它是

pipeline=[{$match:{“ns.coll”:{"$in":[“ecom-tesla-cms-instance”,“ca-tesla-cms-instance”,“ecom-tesla-cms-page”,“ca-tesla-cms-page”]}}}]

收藏将来可能会增加到200个需要观看的收藏,想知道以下三件事

  1. 一个连接器侦听大量集合是否会对性能产生影响?
  2. 一个连接器可以观看的集合有限制吗?
  3. 最佳实践是运行一个连接器侦听100个集合还是运行10个不同的连接器每个侦听10个集合?

最佳实践是运行多个连接器,其中"许多";这取决于你维护它们所有开销的能力。

原因是-一个连接器创建了一个单点故障(每个任务,但一次只能将一个任务分配给任何集合,以防止重复)。如果Connect任务失败并出现不可重试的错误,则连接器的任务将完全停止,并停止从分配给该连接器的所有集合中读取数据。

您还可以尝试Debezium,它可能比Mongo源连接器使用更少的资源,因为它充当副本而不是间隔查询集合。

您可以从多个mongo集合中侦听多个更改流,您只需要为pipeline中的集合名称提供合适的Regex。你甚至可以通过提供你不想听任何更改流的Regex来排除集合/集合。

"pipeline": "[{"$match":{"$and":[{"ns.db":{"$regex":/^database-name$/}},{"ns.coll":{"$regex":/^collection_.*/}}]}}]"  

您甚至可以使用$nin排除任何给定的数据库,您不想侦听任何更改流。

"pipeline": "[{"$match":{"$and":[{"ns.db":{"$regex":/^database-name$/,"$nin":[/^any_database_name$/]}},{"ns.coll":{"$regex":/^collection_.*/}}]}}]"

回答你的问题:

  1. 一个连接器侦听大量集合是否会对性能产生影响?

    • 据我所知,我不这么认为,因为在文档中没有提到它。您可以使用单个连接器侦听多个mongo集合。
  2. 一个连接器可以观看的集合有限制吗?

    • 据我所知,文档中没有提到限制。
  3. 最佳实践是运行一个连接器侦听100个集合还是运行10个不同的连接器每个侦听10个集合?

    • 从我的角度来看,为每个集合创建一个N数量的Kafka连接器将是一个开销,确保使用推荐的配置提供容错,只是不要依赖连接器的默认配置。

这是Kafka连接器的基本配置。

Mongo到Kafka源连接器

{
"name": "mongo-to-kafka-connect",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"publish.full.document.only": "true",
"tasks.max": "3",
"key.converter.schemas.enable": "false",
"topic.creation.enable": "true",
"poll.await.time.ms": 1000,
"poll.max.batch.size": 100,
"topic.prefix": "any prefix for topic name",
"output.json.formatter": "com.mongodb.kafka.connect.source.json.formatter.SimplifiedJson",
"connection.uri": "mongodb://<username>:<password>@ip:27017,ip:27017,ip:27017,ip:27017/?authSource=admin&replicaSet=xyz&tls=true",
"value.converter.schemas.enable": "false",
"copy.existing": "true",
"topic.creation.default.replication.factor": 3,
"topic.creation.default.partitions": 3,
"topic.creation.compacted.cleanup.policy": "compact",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"mongo.errors.log.enable": "true",
"heartbeat.interval.ms": 10000,
"pipeline": "[{"$match":{"$and":[{"ns.db":{"$regex":/^database-name$/}},{"ns.coll":{"$regex":/^collection_.*/}}]}}]"
}
}

您可以从官方文档获得更多详细信息。

  • Mongo docs: https://www.mongodb.com/docs/kafka-connector/current/source-connector/
  • Confluent docs: https://docs.confluent.io/platform/current/connect/index.html
  • 正则表达式:https://www.mongodb.com/docs/manual/reference/operator/query/regex/mongodb-query-op.-regex

最新更新