目前,我设置了2个运行JDBC Sink Connector的独立连接器,以摄取从生产者生成的主题,并将其读入数据库。有时,我在日志中看到错误,这导致生成的消息无法存储到数据库中。我经常看到的错误是
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id:11
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject 'topic-io..models.avro.Topic' not found; error code404
这是真的因为TopicRecordName不应该指向这个主题而是我指向的另一个主题,它应该指向models.avro.Topic
我想知道这种情况是否经常发生,是否有一种方法可以在消息产生后重新摄取那些产生的记录/消息到数据库中。例如,如果消息在12点到1点之间产生,并且日志中出现了某种错误,并且在该时间段内无法使用这些消息,则配置或偏移量可以通过将其重新摄取到数据库来恢复它。该错误是由于架构注册表链接无法读取/链接到正确的架构链接。它失败了,因为它读取了不正确的工作文件,因为我的一个工作文件有value.converter.value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
,而另一个连接器没有读取subjectName。
目前,我设置consumer.auto.offset.reset=earliest
开始读取消息。是否有一种方法可以将这些数据返回到文件中,并且我可以恢复这些数据,因为我正在部署到生产中,并且必须始终将数据消耗到数据库中,而不会出现任何错误。
您可以使用死信队列配置将错误记录发送到新主题,而不是混淆消费者组偏移量,这将最终导致正确处理的数据再次被消费和复制,您需要在主题保留完全丢弃事件之前监视和消费该主题
https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues/
我的一个worker文件有一个[different config]
这就是为什么配置管理软件很重要。不要在没有更新所有服务器的进程的情况下修改分布式系统中的一台服务器。如果你没有在Kubernetes中运行连接器,Ansible/Terraform是最常见的