Kafka Json值反序列化器



我正在使用一个kafka消费者,其属性如下:

key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.connect.json.JsonDeserializer

一个KafkaProducer(value.serializer=org.apache.kafka.connect.json.JsonSerializer)正在将JSON记录推送到一个Topic中,并且这个Consumer正在从中读取,功能方面它工作得很好,但是当我的生产者推送一个非JSON消息(例如:空消息)时问题就出现了。

在这种情况下,消费者正在下降,它将不会消费,直到空消息被清除(我已经将消费者组的偏移量重置为最新)。

有没有办法处理这个问题,也许使用一些属性或类似的东西

消费者API没有像Kafka Streams那样的反序列化异常处理属性

你需要创建你自己的反序列化器来包装json并处理任何错误

你可能会发现azkarra-commons中的SafeDeserializer类很有用

相关内容

  • 没有找到相关文章

最新更新