通常,当我们定义类级@KafkaListener
和方法级@KafkaHandler
时,我们可以定义默认的@KafkaHandler
来处理意外的有效负载。
https://docs.spring.io/spring-kafka/docs/current/reference/html/class-level-kafkalistener
但是,如果我们没有默认方法,我们该怎么办呢?
在2.6及以后的版本中,您可以通过检查异常将SeekToCurrentErrorHandler
配置为立即将此类消息发送到死信主题。
@SpringBootApplication
public class So59256214Application {
public static void main(String[] args) {
SpringApplication.run(So59256214Application.class, args);
}
@Bean
public NewTopic topic1() {
return TopicBuilder.name("so59256214").partitions(1).replicas(1).build();
}
@Bean
public NewTopic topic2() {
return TopicBuilder.name("so59256214.DLT").partitions(1).replicas(1).build();
}
@KafkaListener(id = "so59256214.DLT", topics = "so59256214.DLT")
void listen(ConsumerRecord<?, ?> in) {
System.out.println("dlt: " + in);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, Object> template) {
return args -> {
template.send("so59256214", 42);
template.send("so59256214", 42.0);
template.send("so59256214", "No handler for this");
};
}
@Bean
ErrorHandler eh(KafkaOperations<String, Object> template) {
SeekToCurrentErrorHandler eh = new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template));
BackOff neverRetryOrBackOff = new FixedBackOff(0L, 0);
BackOff normalBackOff = new FixedBackOff(2000L, 3);
eh.setBackOffFunction((rec, ex) -> {
if (ex.getMessage().contains("No method found for class")) {
return neverRetryOrBackOff;
}
else {
return normalBackOff;
}
});
return eh;
}
}
@Component
@KafkaListener(id = "so59256214", topics = "so59256214")
class Listener {
@KafkaHandler
void integerHandler(Integer in) {
System.out.println("int: " + in);
}
@KafkaHandler
void doubleHandler(Double in) {
System.out.println("double: " + in);
}
}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
结果:
int: 42
double: 42.0
dlt: ConsumerRecord(topic = so59256214.DLT, ...