Kafka Connect JDBC Connector语言 - 由于不可恢复的异常,正在退出 WorkerSinkTask



我正在使用 JDBC 接收器连接器,并且在主题中收到一条错误消息。我知道为什么消息不好(由于生产者的问题,由于违反 FK 约束而失败(。工作人员任务报告的错误是:

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
java.util.concurrent.FutureTask.run(FutureTask.java:266)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)nCaused by: org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: java.sql.BatchUpdateException: Cannot add or update a child row: a foreign key constraint fails (`sensorium`.`reading`, CONSTRAINT `reading_ibfk_1` FOREIGN KEY (`sensorId`) REFERENCES `sensor` (`id`))ncom.mysql.jdbc.exceptions.jdbc4.MySQLIntegrityConstraintViolationException: Cannot add or update a child row: a foreign key constraint fails (`sensorium`.`reading`, CONSTRAINT `reading_ibfk_1` FOREIGN KEY (`sensorId`) REFERENCES `sensor` (`id`))n
io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:86)
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)nt... 10 morenCaused by: java.sql.SQLException: java.sql.BatchUpdateException: 
Cannot add or update a child row: a foreign key constraint fails
(`sensorium`.`reading`, CONSTRAINT `reading_ibfk_1` FOREIGN KEY (`sensorId`) REFERENCES `sensor` 
(`id`))ncom.mysql.jdbc.exceptions.jdbc4.MySQLIntegrityConstraintViolation
Exception: Cannot add or update a child row: a foreign key constraint
fails (`sensorium`.`reading`, CONSTRAINT `reading_ibfk_1` FOREIGN KEY
(`sensorId`) REFERENCES `sensor` (`id`))

我想发生的是跳过这个坏消息。所以我尝试设置"errors.tolerance": "all".接收器连接器的完整配置如下所示:

{
    "name": "reading-sink2",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": 4,
        "topics": "READING_MYSQL",
        "key.converter.schema.registry.url": "http://localhost:8081",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://localhost:8081",
        "connection.url": "jdbc:mysql://localhost:3306/sensorium?user=app&password=tQpRMCzHlAeu6kQIBk4U",
        "auto.create": true,
        "table.name.format": "reading",
        "errors.tolerance": "all"
    }
}

但是记录了相同的错误,没有跳过消息,也没有处理后续消息。

为什么errors.tolerance: all没有按预期工作?

errors.tolerance属性是指在转换(消息与 Kafka Connect 架构之间的转换(或转换消息(应用单消息转换(期间发生的错误。

您不能跳过/吞咽在SinkTask::put(Collection<SinkRecord> records)SourceTask::poll()期间引发的异常

在您的情况下,会引发异常SinkTask::put(...)

io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:86(

关于类似问题的问题:

  • Kafka Connect 接收器任务忽略容差限制
  • Kafka Connect - JDBC sink SQL Exception
  • Apache Kafka JDBC 连接器 - 序列化异常:未知魔术字节

您可以在以下博客中阅读有关此内容的更多信息:confluent页面:https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues

您可以使用kafka-consumer-groups工具手动跳过错误记录:

kafka-consumer-groups 
    --bootstrap-server kafka:29092 
    --group connect-sink_postgres_foo_00 
    --reset-offsets 
    --topic foo 
    --to-offset 2 
    --execute

有关更多信息,请参阅此处。

我已经记录了水槽的改进建议,请随时点赞:https://github.com/confluentinc/kafka-connect-jdbc/issues/721

最新更新