如何避免使用Kafka流丢失消息



我们有一个流应用程序,它使用来自源主题的消息,进行一些处理并将结果转发到目标主题。

消息的结构由一些avro模式控制。

如果架构尚未缓存,则在开始使用消息时,应用程序将尝试从架构注册表中检索它。如果由于任何原因模式注册表不可用(比如网络故障(,则当前正在处理的消息将丢失,因为默认处理程序是LogAndContinueExceptionHandler

o.a.k.s.e.LogAndContinueExceptionHandler : Exception caught during Deserialization, taskId: 1_5, topic: my.topic.v1, partition: 5, offset: 142768
org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 62
Caused by: java.net.SocketTimeoutException: connect timed out
at java.base/java.net.PlainSocketImpl.socketConnect(Native Method) ~[na:na]
...
o.a.k.s.p.internals.RecordDeserializer : stream-thread [my-app-StreamThread-3] task [1_5] Skipping record due to deserialization error. topic=[my.topic.v1] partition=[5] offset=[142768]
...

因此,我的问题是,处理上述情况的正确方法是什么,并确保你无论如何都不会丢失信息。是否有现成的LogAndRollbackExceptionHandler错误处理程序或实现自己的方法?

提前感谢您的投入。

我没有在Kafka上做过很多工作,但当我做了很多工作时,我记得我遇到了一些问题,比如你在我们的系统中描述的问题。

让我告诉你我们是如何处理我们的场景的,也许这也会帮助你:

场景1:如果您的消息在发布端(publisher->kafka(丢失,您可以根据需要配置kafka确认设置,如果您将spring cloud stream与kafka一起使用,则属性为spring.cloud.stream.kafka.binder.required acks

可能值:

  1. 最多一次(Ack=0(

    1. 出版者不在乎卡夫卡是否承认
    2. 发送并忘记
    3. 数据可能丢失
  2. 至少一次(Ack=1(

    1. 若Kafka并没有确认,发布者会重新发送消息。

    2. 可能的重复。

    3. 在将消息复制到副本之前发送确认。

  3. 恰好一次(Ack=全部(

    1. 若Kafka并没有确认,发布者会重新发送消息。

    2. 然而,如果一条消息被发送给卡夫卡不止一次,就不会有重复。

    3. 内部序列号,用于决定消息是否已写入主题。

    4. 需要设置Min.insync.replicas属性,以确保在kafka向生产者确认之前需要同步的最小回复数。

场景2:如果您的数据在消费者端(kafka->消费者(丢失,您可以根据自己的使用情况更改kafka的自动提交功能。如果您使用的是Spring cloud streamSpring.cloud.stream.kafka.bindings.input.consumer.AutoCommitOffset.,则此属性为

默认情况下,AutoCommitOffset在kafka中为true,并且发送给消费者的每个消息都是";承诺";在卡夫卡的结尾,这意味着它不会再被发送了。但是,如果您将AutoCommitOffset更改为false,您将有权在代码中轮询来自kafka的消息,并且在完成工作后,将commit明确设置为true,让kafka知道您现在已经完成了消息处理。

如果消息未提交,kafka将继续重新发送,直到提交为止。

希望这能帮助你,或者至少为你指明正确的方向。

相关内容

  • 没有找到相关文章

最新更新