我有一个简单的基于Kafka Streams的Spring应用程序,它使用来自传入主题的消息,执行map
转换并打印此消息。 KStream
这样配置
@Bean
public KStream<?, ?> processingPipeline(StreamsBuilder builder, MyTransformer myTransformer,
PrintAction printAction, String topicName) {
KStream<String, JsonNode> source = builder.stream(topicName,
Consumed.with(Serdes.String(), new JsonSerde<>(JsonNode.class)));
// @formatter:off
source
.map(myTransformer)
.foreach(printAction);
// @formatter:on
return source;
}
在MyTransformer
内部,我正在调用此时可能关闭的外部微服务。如果调用失败(通常抛出RuntimeException
(,我无法进行转换。
问题是,如果在以前的处理过程中发生任何错误,有什么方法可以再次重新处理Streams应用程序中的消息吗?
根据我目前的研究,这里没有办法这样做,我唯一的可能性是将消息推送到死信主题中,并在将来尝试处理它,如果它再次失败,我再次将其推送到 DLT 中并以这种方式重试。
如果在 Kafka 流处理过程中发生任何未捕获的异常,您的流将状态更改为 ERROR,并停止使用发生错误的分区的传入消息。您需要自己捕获异常。重试可以实现:1(使用Spring RetryTemplate
调用外部微服务(但请记住,使用来自特定分区的消息会有延迟(,或者2(将失败的消息推送到另一个主题以供以后重新处理(如您所建议的(
自kafka-streams
2.8.0
年以来的更新
自 kafka-streams
2.8.0
起,您可以自动替换失败的流线程(由未捕获的异常引起(使用KafkaStreams
方法void setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler eh);
StreamThreadExceptionResponse.REPLACE_THREAD
.有关更多详细信息,请查看 Kafka Streams 特定的未捕获异常处理程序
kafkaStreams.setUncaughtExceptionHandler(ex -> {
log.error("Kafka-Streams uncaught exception occurred. Stream will be replaced with new thread", ex);
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
});