spring kafka:从负载中过滤KafkaNull值



我的消费者配置如下:

@Bean
public Consumer<Message<List<Foo>>> consumer() {
return message ->  {
message.getPayload().forEach(it -> {
// process
})
};
}

我还配置了ErrorHandlingDeserializer,它可能在消息有效负载收集中产生KafkaNull值。问题,我不能过滤这样的值,因为访问集合与forEach()方法产生ClassCastException:java.lang.ClassCastException: class org.springframework.kafka.support.KafkaNull cannot be cast to class Foo

我如何从处理中排除KafkaNull值(不将消费者签名更改为Consumer<Message<List<Object>>>)?

所以我们目前没有过滤函数。我确实提出了这个问题- https://github.com/spring-cloud/spring-cloud-function/issues/736

但是现在对你来说最好的方法是这样做:

public Consumer<Message<List<Object>>> consumer() {
return message ->  {
message.getPayload().stream().filter(it -> it instanceof Foo).forEach(it -> {
// process
})
};
}

最新更新