卡夫卡接收器连接器中无效 JSON 的错误处理



我有一个用于mongodb的接收器连接器,它从一个主题中获取json并将其放入mongoDB集合中。但是,当我从生产者向该主题发送无效的 JSON 时(例如,使用无效的特殊字符"(=>{"id":1,"name":""},连接器停止。我尝试使用错误.tolerance = all,但同样的事情正在发生。应该发生的是连接器应跳过并记录无效的 JSON,并保持连接器运行。我的分布式模式连接器如下所示:

{
"name": "sink-mongonew_test1",  
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"topics": "error7",
"connection.uri": "mongodb://****:27017", 
"database": "abcd", 
"collection": "abc",
"type.name": "kafka-connect",
"key.ignore": "true",
"document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy",
"value.projection.list": "id",
"value.projection.type": "whitelist",
"writemodel.strategy": "com.mongodb.kafka.connect.sink.writemodel.strategy.UpdateOneTimestampsStrategy",
"delete.on.null.values": "false",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"errors.tolerance": "all",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"errors.deadletterqueue.topic.name": "crm_data_deadletterqueue",
"errors.deadletterqueue.topic.replication.factor": "1",
"errors.deadletterqueue.context.headers.enable": "true"
}
}

Apache Kafka 2.0 以来,Kafka Connect 包含错误处理选项,包括将消息路由到死信队列的功能,这是构建数据管道的常用技术。

https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues/

如前所述,您使用的是connect-api-1.0.1.*.jar版本1.0.1,因此解释了为什么这些属性不起作用

除了运行较新版本的Kafka Connect之外,您的替代方案包括Nifi或Spark Structured Streaming

最新更新