如何在Spring Kafka Consumer中跳过损坏的(不可序列化的)消息



这个问题是针对Spring Kafka的,与具有高级消费者的Apache Kafka有关:跳过损坏的消息

有没有办法将Spring Kafka使用者配置为跳过无法读取/处理(已损坏(的记录?

我看到的情况是,如果不能反序列化,消费者就会被困在同一条记录上。这是消费者抛出的错误。

Caused by: com.fasterxml.jackson.databind.JsonMappingException: Can not construct instance of java.time.LocalDate: no long/Long-argument constructor/factory method to deserialize from Number value 

消费者对主题进行轮询,并在循环中不断打印相同的错误,直到程序被终止。

在具有以下Consumer工厂配置的@KafkaListener中,

Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

您需要ErrorHandlingDeserializer:https://docs.spring.io/spring-kafka/docs/2.2.0.RELEASE/reference/html/_reference.html#error-处理解串器

如果您不能移动到2.2版本,请考虑实现自己的版本,并为那些不能正确反序列化的记录返回null

源代码如下:https://github.com/spring-projects/spring-kafka/blob/master/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/ErrorHandlingDeserializer2.java

如果您使用的是旧版本的kafka,请在@KafkaListener中设置以下Consumer工厂配置。

Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CustomDeserializer.class);

以下是CustomDeserializer的代码:

import java.util.Map;
import org.apache.kafka.common.serialization.Deserializer;
import com.fasterxml.jackson.databind.ObjectMapper;
public class CustomDeserializer implements Deserializer<Object>
{
@Override
public void configure( Map<String, ?> configs, boolean isKey )
{
}
@Override
public Object deserialize( String topic, byte[] data )
{
ObjectMapper mapper = new ObjectMapper();
Object object = null;
try
{
object = mapper.readValue(data, Object.class);
}
catch ( Exception exception )
{
System.out.println("Error in deserializing bytes " + exception);
}
return object;
}
@Override
public void close()
{
}
}

由于我希望我的代码足够通用,可以读取任何类型的json,object=mapper.readValue(数据,object.class(;我正在将它转换为Object.class。由于我们在这里捕获异常,读取后不会重试。

最新更新