我正在使用一个kafka消费者,其属性如下:
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.connect.json.JsonDeserializer
一个KafkaProducer(value.serializer=org.apache.kafka.connect.json.JsonSerializer)正在将JSON记录推送到一个Topic中,并且这个Consumer正在从中读取,功能方面它工作得很好,但是当我的生产者推送一个非JSON消息(例如:空消息)时问题就出现了。
在这种情况下,消费者正在下降,它将不会消费,直到空消息被清除(我已经将消费者组的偏移量重置为最新)。
有没有办法处理这个问题,也许使用一些属性或类似的东西
消费者API没有像Kafka Streams那样的反序列化异常处理属性
你需要创建你自己的反序列化器来包装json并处理任何错误
你可能会发现azkarra-commons中的SafeDeserializer类很有用