如何在Kafka Connect中重新处理消息?



我正在使用Kafka Sink Connector,它从Kafka主题读取数据并将数据放入目标数据库(在我的情况下是Neo4j实例).消息需要严格按顺序处理,因为它们不是幂等的。我的问题是,如果由于某种原因发生异常,例如1。数据库关闭,2。数据库连接丢失,3。模式解析失败,如何重新处理消息?

我知道我们可以运行error.tolerance=none配置并将失败消息重定向到死信队列。但我的问题是我们是否有办法再次处理选定的信息?此外,是否有任何审计机制来跟踪处理了多少消息,以从给定的偏移量(无需手动偏移量重置)中查找。

下面是我的连接器配置。还建议如果有更好的数据集成技术除了卡夫卡连接器下沉到目标数据库的数据。

{
"topics": "mytopic",
"connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
"tasks.max":"1",
"key.converter.schemas.enable":"true",
"values.converter.schemas.enable":"true",
"errors.retry.timeout": "-1",
"errors.retry.delay.max.ms": "1000",
"errors.tolerance": "none",
"errors.deadletterqueue.topic.name": "deadletter-topic",
"errors.deadletterqueue.topic.replication.factor":1,
"errors.deadletterqueue.context.headers.enable":true,
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"key.converter.enhanced.avro.schema.support":true,
"value.converter.enhanced.avro.schema.support":true,
"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"https://schema-url/",
"value.converter.basic.auth.credentials.source":"USER_INFO",
"value.converter.basic.auth.user.info":"user:pass",
"errors.log.enable": true,
"schema.ignore":"false",
"errors.log.include.messages": true,
"neo4j.server.uri": "neo4j://my-ip:7687/neo4j",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "neo4j",
"neo4j.encryption.enabled": false,
"neo4j.topic.cypher.mytopic": "MERGE (p:Loc_Con{name: event.geography.name})"
}

对于非致命异常,连接器将写入死信主题。

您需要另一个连接器或其他消费者来读取其他主题以处理该数据。因为它是一个主题,所以没有直接的方法来处理选中的消息

JMX指标或Neo4j数据库指标都应该能够告诉您在一段时间内大约处理了多少消息

最新更新