我正在使用Debezium MongoDB连接器来从Mongo的30GB集合进行流。
这是我的配置:
"config": {
"connector.class" : "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max" : "1",
"mongodb.hosts" : "",
"mongodb.name" : "",
"mongodb.user" : "",
"mongodb.password" : "",
"database.whitelist" : "mydb",
"collection.whitelist" : "mydb.activity",
"database.history.kafka.bootstrap.servers" : "kafka:9092",
"transforms": "unwrap",
"transforms.unwrap.type" : "io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope",
"key.converter" : "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url" : "http://schema-registry:8081",
"value.converter" : "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url" : "http://schema-registry:8081",
"internal.key.converter" : "org.apache.kafka.connect.json.JsonConverter",
"internal.value.converter" : "org.apache.kafka.connect.json.JsonConverter",
"schema.compatibility" : "NONE"
}
起初,我得到了一个"为主题创建了太多模式",所以我添加了
"value.converter.max.schemas.per.subject" : "100000"
现在,在为主题值创建了许多模式后,Kafka-connect在许多模式中都大大减慢。
。我在kafka-streams应用程序中使用此主题,因此无法将SMT移至水槽(没有水槽连接器)
架构在收集项目之间进行了更改,但不超过500次,并且也向后兼容,所以我不明白为什么会创建这么多的模式。
任何建议都会有所帮助
我最终写了一个将架构保留在缓存中的SMT,以实现2个目标: 1.保留字段的顺序。 2.在每个记录架构中都有所有字段作为可选的。
这样,模式会进化,直到包含字段的所有选项,然后除非添加新字段,否则不再更改。