在 Kafka 流中处理消息时发生错误时重新处理消息



我有一个简单的基于Kafka Streams的Spring应用程序,它使用来自传入主题的消息,执行map转换并打印此消息。 KStream这样配置

@Bean
public KStream<?, ?> processingPipeline(StreamsBuilder builder, MyTransformer myTransformer,
         PrintAction printAction, String topicName) {
    KStream<String, JsonNode> source = builder.stream(topicName,
                Consumed.with(Serdes.String(), new JsonSerde<>(JsonNode.class)));
    // @formatter:off
    source
        .map(myTransformer)
        .foreach(printAction);
    // @formatter:on
    return source;
}

MyTransformer内部,我正在调用此时可能关闭的外部微服务。如果调用失败(通常抛出RuntimeException(,我无法进行转换。

问题是,如果在以前的处理过程中发生任何错误,有什么方法可以再次重新处理Streams应用程序中的消息吗?

根据我目前的研究,这里没有办法这样做,我唯一的可能性是将消息推送到死信主题中,并在将来尝试处理它,如果它再次失败,我再次将其推送到 DLT 中并以这种方式重试。

如果在 Kafka 流处理过程中发生任何未捕获的异常,您的流将状态更改为 ERROR,并停止使用发生错误的分区的传入消息。您需要自己捕获异常。重试可以实现:1(使用Spring RetryTemplate调用外部微服务(但请记住,使用来自特定分区的消息会有延迟(,或者2(将失败的消息推送到另一个主题以供以后重新处理(如您所建议的(

<小时 />

kafka-streams 2.8.0年以来的更新

kafka-streams 2.8.0 起,您可以自动替换失败的流线程(由未捕获的异常引起(使用KafkaStreams方法void setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler eh); StreamThreadExceptionResponse.REPLACE_THREAD .有关更多详细信息,请查看 Kafka Streams 特定的未捕获异常处理程序

kafkaStreams.setUncaughtExceptionHandler(ex -> {
    log.error("Kafka-Streams uncaught exception occurred. Stream will be replaced with new thread", ex);
    return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
});

相关内容

  • 没有找到相关文章

最新更新